diff --git a/rumqttc/README.md b/rumqttc/README.md index 884e90a0..e072248f 100644 --- a/rumqttc/README.md +++ b/rumqttc/README.md @@ -65,6 +65,8 @@ Quick overview of features - Queue size based flow control on outgoing packets - Automatic reconnections by just continuing the `eventloop.poll()/connection.iter()` loop - Natural backpressure to client APIs during bad network +- Support for WebSockets +- Secure transport using TLS In short, everything necessary to maintain a robust connection diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 9fe70653..84436d7d 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -30,15 +30,22 @@ impl From> for ClientError { } } -/// `AsyncClient` to communicate with MQTT `Eventloop` -/// This is cloneable and can be used to asynchronously Publish, Subscribe. +/// An asynchronous client, communicates with MQTT `EventLoop`. +/// +/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`), +/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`, which is to be polled parallelly. +/// +/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets +/// from the broker, i.e. move ahead. #[derive(Clone, Debug)] pub struct AsyncClient { request_tx: Sender, } impl AsyncClient { - /// Create a new `AsyncClient` + /// Create a new `AsyncClient`. + /// + /// `cap` specifies the capacity of the bounded async channel. pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) { let eventloop = EventLoop::new(options, cap); let request_tx = eventloop.handle(); @@ -54,7 +61,7 @@ impl AsyncClient { AsyncClient { request_tx } } - /// Sends a MQTT Publish to the eventloop + /// Sends a MQTT Publish to the `EventLoop`. pub async fn publish( &self, topic: S, @@ -73,7 +80,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Publish to the eventloop + /// Attempts to send a MQTT Publish to the `EventLoop`. pub fn try_publish( &self, topic: S, @@ -92,7 +99,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set. + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); @@ -102,7 +109,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set. + /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); if let Some(ack) = ack { @@ -111,7 +118,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Publish to the eventloop + /// Sends a MQTT Publish to the `EventLoop` pub async fn publish_bytes( &self, topic: S, @@ -129,7 +136,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Subscribe to the eventloop + /// Sends a MQTT Subscribe to the `EventLoop` pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { let subscribe = Subscribe::new(topic.into(), qos); let request = Request::Subscribe(subscribe); @@ -137,7 +144,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Subscribe to the eventloop + /// Attempts to send a MQTT Subscribe to the `EventLoop` pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { let subscribe = Subscribe::new(topic.into(), qos); let request = Request::Subscribe(subscribe); @@ -145,7 +152,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Subscribe for multiple topics to the eventloop + /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` pub async fn subscribe_many(&self, topics: T) -> Result<(), ClientError> where T: IntoIterator, @@ -156,7 +163,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Subscribe for multiple topics to the eventloop + /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop` pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> where T: IntoIterator, @@ -167,7 +174,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Unsubscribe to the eventloop + /// Sends a MQTT Unsubscribe to the `EventLoop` pub async fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe); @@ -175,7 +182,7 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT Unsubscribe to the eventloop + /// Attempts to send a MQTT Unsubscribe to the `EventLoop` pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe); @@ -183,14 +190,14 @@ impl AsyncClient { Ok(()) } - /// Sends a MQTT disconnect to the eventloop + /// Sends a MQTT disconnect to the `EventLoop` pub async fn disconnect(&self) -> Result<(), ClientError> { let request = Request::Disconnect; self.request_tx.send_async(request).await?; Ok(()) } - /// Sends a MQTT disconnect to the eventloop + /// Attempts to send a MQTT disconnect to the `EventLoop` pub fn try_disconnect(&self) -> Result<(), ClientError> { let request = Request::Disconnect; self.request_tx.try_send(request)?; @@ -207,10 +214,16 @@ fn get_ack_req(publish: &Publish) -> Option { Some(ack) } -/// `Client` to communicate with MQTT eventloop `Connection`. +/// A synchronous client, communicates with MQTT `EventLoop`. +/// +/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`), +/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel +/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread. /// -/// Client is cloneable and can be used to synchronously Publish, Subscribe. -/// Asynchronous channel handle can also be extracted if necessary +/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order +/// to send, receive and process packets from the broker, i.e. move ahead. +/// +/// An asynchronous channel handle can also be extracted if necessary. #[derive(Clone)] pub struct Client { client: AsyncClient, @@ -218,6 +231,8 @@ pub struct Client { impl Client { /// Create a new `Client` + /// + /// `cap` specifies the capacity of the bounded async channel. pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) { let (client, eventloop) = AsyncClient::new(options, cap); let client = Client { client }; @@ -230,7 +245,7 @@ impl Client { (client, connection) } - /// Sends a MQTT Publish to the eventloop + /// Sends a MQTT Publish to the `EventLoop` pub fn publish( &mut self, topic: S, @@ -261,25 +276,25 @@ impl Client { Ok(()) } - /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set. + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> { pollster::block_on(self.client.ack(publish))?; Ok(()) } - /// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set. + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> { self.client.try_ack(publish)?; Ok(()) } - /// Sends a MQTT Subscribe to the eventloop + /// Sends a MQTT Subscribe to the `EventLoop` pub fn subscribe>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> { pollster::block_on(self.client.subscribe(topic, qos))?; Ok(()) } - /// Sends a MQTT Subscribe to the eventloop + /// Sends a MQTT Subscribe to the `EventLoop` pub fn try_subscribe>( &mut self, topic: S, @@ -289,7 +304,7 @@ impl Client { Ok(()) } - /// Sends a MQTT Subscribe for multiple topics to the eventloop + /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` pub fn subscribe_many(&mut self, topics: T) -> Result<(), ClientError> where T: IntoIterator, @@ -304,25 +319,25 @@ impl Client { self.client.try_subscribe_many(topics) } - /// Sends a MQTT Unsubscribe to the eventloop + /// Sends a MQTT Unsubscribe to the `EventLoop` pub fn unsubscribe>(&mut self, topic: S) -> Result<(), ClientError> { pollster::block_on(self.client.unsubscribe(topic))?; Ok(()) } - /// Sends a MQTT Unsubscribe to the eventloop + /// Sends a MQTT Unsubscribe to the `EventLoop` pub fn try_unsubscribe>(&mut self, topic: S) -> Result<(), ClientError> { self.client.try_unsubscribe(topic)?; Ok(()) } - /// Sends a MQTT disconnect to the eventloop + /// Sends a MQTT disconnect to the `EventLoop` pub fn disconnect(&mut self) -> Result<(), ClientError> { pollster::block_on(self.client.disconnect())?; Ok(()) } - /// Sends a MQTT disconnect to the eventloop + /// Sends a MQTT disconnect to the `EventLoop` pub fn try_disconnect(&mut self) -> Result<(), ClientError> { self.client.try_disconnect()?; Ok(()) @@ -357,7 +372,7 @@ impl Connection { } } -/// Iterator which polls the eventloop for connection progress +/// Iterator which polls the `EventLoop` for connection progress pub struct Iter<'a> { connection: &'a mut Connection, runtime: runtime::Runtime, diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index b443c863..75a3c51e 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -195,6 +195,7 @@ impl From for Request { } } +/// Transport methods. Defaults to TCP. #[derive(Clone)] pub enum Transport { Tcp, @@ -279,6 +280,7 @@ impl Transport { } } +/// TLS configuration method #[derive(Clone)] #[cfg(feature = "use-rustls")] pub enum TlsConfiguration { @@ -304,7 +306,7 @@ impl From for TlsConfiguration { // TODO: Should all the options be exposed as public? Drawback // would be loosing the ability to panic when the user options // are wrong (e.g empty client id) or aggressive (keep alive time) -/// Options to configure the behaviour of mqtt connection +/// Options to configure the behaviour of MQTT connection #[derive(Clone)] pub struct MqttOptions { /// broker address that you want to connect to diff --git a/rumqttc/src/tls.rs b/rumqttc/src/tls.rs index e9508628..0d453f6f 100644 --- a/rumqttc/src/tls.rs +++ b/rumqttc/src/tls.rs @@ -15,19 +15,26 @@ use std::io::{BufReader, Cursor}; use std::net::AddrParseError; use std::sync::Arc; +/// TLS backend error #[derive(Debug, thiserror::Error)] pub enum Error { + /// Error parsing IP address #[error("Addr")] Addr(#[from] AddrParseError), + /// I/O related error #[error("I/O: {0}")] Io(#[from] io::Error), + /// Certificate/Name validation error #[error("Web Pki: {0}")] WebPki(#[from] webpki::Error), + /// Invalid DNS name #[error("DNS name")] DNSName(#[from] InvalidDnsNameError), + /// Error from rustls module #[error("TLS error: {0}")] TLS(#[from] rustls::Error), - #[error("No valid cert in chain")] + /// No valid certificate in chain + #[error("No valid certificate in chain")] NoValidCertInChain, }