Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: better explain client surfaces #380

Merged
merged 7 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rumqttc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Quick overview of features
- Queue size based flow control on outgoing packets
- Automatic reconnections by just continuing the `eventloop.poll()/connection.iter()` loop
- Natural backpressure to client APIs during bad network
- Support for WebSockets
- Secure transport using TLS

In short, everything necessary to maintain a robust connection

Expand Down
75 changes: 45 additions & 30 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,22 @@ impl From<TrySendError<Request>> for ClientError {
}
}

/// `AsyncClient` to communicate with MQTT `Eventloop`
/// This is cloneable and can be used to asynchronously Publish, Subscribe.
/// An asynchronous client, communicates with MQTT `EventLoop`.
///
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
///
/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
/// from the broker, i.e. move ahead.
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<Request>,
}

impl AsyncClient {
/// Create a new `AsyncClient`
/// Create a new `AsyncClient`.
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.handle();
Expand All @@ -54,7 +61,7 @@ impl AsyncClient {
AsyncClient { request_tx }
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`.
pub async fn publish<S, V>(
&self,
topic: S,
Expand All @@ -73,7 +80,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Publish to the eventloop
/// Attempts to send a MQTT Publish to the `EventLoop`.
pub fn try_publish<S, V>(
&self,
topic: S,
Expand All @@ -92,7 +99,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);

Expand All @@ -102,7 +109,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
if let Some(ack) = ack {
Expand All @@ -111,7 +118,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`
pub async fn publish_bytes<S>(
&self,
topic: S,
Expand All @@ -129,23 +136,23 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Attempts to send a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.try_send(request)?;
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -156,7 +163,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -167,30 +174,30 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.try_send(request)?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub async fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Attempts to send a MQTT disconnect to the `EventLoop`
pub fn try_disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.try_send(request)?;
Expand All @@ -207,17 +214,25 @@ fn get_ack_req(publish: &Publish) -> Option<Request> {
Some(ack)
}

/// `Client` to communicate with MQTT eventloop `Connection`.
/// A synchronous client, communicates with MQTT `EventLoop`.
///
/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
///
/// Client is cloneable and can be used to synchronously Publish, Subscribe.
/// Asynchronous channel handle can also be extracted if necessary
/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
/// to send, receive and process packets from the broker, i.e. move ahead.
///
/// An asynchronous channel handle can also be extracted if necessary.
#[derive(Clone)]
pub struct Client {
client: AsyncClient,
}

impl Client {
/// Create a new `Client`
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };
Expand All @@ -230,7 +245,7 @@ impl Client {
(client, connection)
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`
pub fn publish<S, V>(
&mut self,
topic: S,
Expand Down Expand Up @@ -261,25 +276,25 @@ impl Client {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
pollster::block_on(self.client.ack(publish))?;
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
self.client.try_ack(publish)?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub fn subscribe<S: Into<String>>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> {
pollster::block_on(self.client.subscribe(topic, qos))?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(
&mut self,
topic: S,
Expand All @@ -289,7 +304,7 @@ impl Client {
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -304,25 +319,25 @@ impl Client {
self.client.try_subscribe_many(topics)
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
pollster::block_on(self.client.unsubscribe(topic))?;
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
self.client.try_unsubscribe(topic)?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub fn disconnect(&mut self) -> Result<(), ClientError> {
pollster::block_on(self.client.disconnect())?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub fn try_disconnect(&mut self) -> Result<(), ClientError> {
self.client.try_disconnect()?;
Ok(())
Expand Down Expand Up @@ -357,7 +372,7 @@ impl Connection {
}
}

/// Iterator which polls the eventloop for connection progress
/// Iterator which polls the `EventLoop` for connection progress
pub struct Iter<'a> {
connection: &'a mut Connection,
runtime: runtime::Runtime,
Expand Down
4 changes: 3 additions & 1 deletion rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl From<Unsubscribe> for Request {
}
}

/// Transport methods. Defaults to TCP.
#[derive(Clone)]
pub enum Transport {
Tcp,
Expand Down Expand Up @@ -279,6 +280,7 @@ impl Transport {
}
}

/// TLS configuration method
#[derive(Clone)]
#[cfg(feature = "use-rustls")]
pub enum TlsConfiguration {
Expand All @@ -304,7 +306,7 @@ impl From<ClientConfig> for TlsConfiguration {
// TODO: Should all the options be exposed as public? Drawback
// would be loosing the ability to panic when the user options
// are wrong (e.g empty client id) or aggressive (keep alive time)
/// Options to configure the behaviour of mqtt connection
/// Options to configure the behaviour of MQTT connection
#[derive(Clone)]
pub struct MqttOptions {
/// broker address that you want to connect to
Expand Down
9 changes: 8 additions & 1 deletion rumqttc/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@ use std::io::{BufReader, Cursor};
use std::net::AddrParseError;
use std::sync::Arc;

/// TLS backend error
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error parsing IP address
#[error("Addr")]
Addr(#[from] AddrParseError),
/// I/O related error
#[error("I/O: {0}")]
Io(#[from] io::Error),
/// Certificate/Name validation error
#[error("Web Pki: {0}")]
WebPki(#[from] webpki::Error),
/// Invalid DNS name
#[error("DNS name")]
DNSName(#[from] InvalidDnsNameError),
/// Error from rustls module
#[error("TLS error: {0}")]
TLS(#[from] rustls::Error),
#[error("No valid cert in chain")]
/// No valid certificate in chain
#[error("No valid certificate in chain")]
NoValidCertInChain,
}

Expand Down