Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove ability to cancel EventLoop.poll() mid execution #421

Merged
merged 9 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
----------------
rumqttc
----------
-----------
- Change variants in `ClientError` to not expose dependence on flume/`SendError` (#420)
- Revert erroring out when Subscription filter list is empty (#422).
- Revert erroring out when Subscription filter list is empty (#422)
- Remove the ability to cancel `EventLoop.poll()` mid execution (#421)

-----------

Expand Down
1 change: 0 additions & 1 deletion rumqttc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Quick overview of features
- Queue size based flow control on outgoing packets
- Automatic reconnections by just continuing the `eventloop.poll()/connection.iter()` loop
- Natural backpressure to client APIs during bad network
- Immediate cancellation with `client.cancel()`

In short, everything necessary to maintain a robust connection

Expand Down
40 changes: 4 additions & 36 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,12 @@ use tokio::runtime::Runtime;
/// Client Error
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Failed to send cancel request to eventloop")]
Cancel,
#[error("Failed to send mqtt requests to eventloop")]
Request(Request),
#[error("Failed to send mqtt requests to eventloop")]
TryRequest(Request),
}

impl From<SendError<()>> for ClientError {
fn from(_: SendError<()>) -> Self {
Self::Cancel
}
}

impl From<SendError<Request>> for ClientError {
fn from(e: SendError<Request>) -> Self {
Self::Request(e.into_inner())
Expand All @@ -43,31 +35,23 @@ impl From<TrySendError<Request>> for ClientError {
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<Request>,
cancel_tx: Sender<()>,
}

impl AsyncClient {
/// Create a new `AsyncClient`
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let mut eventloop = EventLoop::new(options, cap);
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.handle();
let cancel_tx = eventloop.cancel_handle();

let client = AsyncClient {
request_tx,
cancel_tx,
};
let client = AsyncClient { request_tx };

(client, eventloop)
}

/// Create a new `AsyncClient` from a pair of async channel `Sender`s. This is mostly useful for
/// creating a test instance.
pub fn from_senders(request_tx: Sender<Request>, cancel_tx: Sender<()>) -> AsyncClient {
AsyncClient {
request_tx,
cancel_tx,
}
pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
AsyncClient { request_tx }
}

/// Sends a MQTT Publish to the eventloop
Expand Down Expand Up @@ -212,12 +196,6 @@ impl AsyncClient {
self.request_tx.try_send(request)?;
Ok(())
}

/// Stops the eventloop right away
pub async fn cancel(&self) -> Result<(), ClientError> {
self.cancel_tx.send_async(()).await?;
Ok(())
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
Expand Down Expand Up @@ -349,12 +327,6 @@ impl Client {
self.client.try_disconnect()?;
Ok(())
}

/// Stops the eventloop right away
pub fn cancel(&mut self) -> Result<(), ClientError> {
pollster::block_on(self.client.cancel())?;
Ok(())
}
}

/// MQTT connection. Maintains all the necessary state
Expand Down Expand Up @@ -403,10 +375,6 @@ impl<'a> Iterator for Iter<'a> {
trace!("Done with requests");
None
}
Err(ConnectionError::Cancel) => {
trace!("Cancellation request received");
None
}
Err(e) => Some(Err(e)),
}
}
Expand Down
41 changes: 5 additions & 36 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ pub enum ConnectionError {
NotConnAck(Packet),
#[error("Requests done")]
RequestsDone,
#[error("Cancel request by the user")]
Cancel,
}

/// Eventloop with all the state of a connection
Expand All @@ -68,10 +66,6 @@ pub struct EventLoop {
pub(crate) network: Option<Network>,
/// Keep alive time
pub(crate) keepalive_timeout: Option<Pin<Box<Sleep>>>,
/// Handle to read cancellation requests
pub(crate) cancel_rx: Receiver<()>,
/// Handle to send cancellation requests (and drops)
pub(crate) cancel_tx: Sender<()>,
}

/// Events which can be yielded by the event loop
Expand All @@ -87,7 +81,6 @@ impl EventLoop {
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
let (cancel_tx, cancel_rx) = bounded(5);
let (requests_tx, requests_rx) = bounded(cap);
let pending = Vec::new();
let pending = pending.into_iter();
Expand All @@ -102,8 +95,6 @@ impl EventLoop {
pending,
network: None,
keepalive_timeout: None,
cancel_rx,
cancel_tx,
}
}

Expand All @@ -112,14 +103,6 @@ impl EventLoop {
self.requests_tx.clone()
}

/// Handle for cancelling the eventloop.
///
/// Can be useful in cases when connection should be halted immediately
/// between half-open connection detections or (re)connection timeouts
pub(crate) fn cancel_handle(&mut self) -> Sender<()> {
self.cancel_tx.clone()
}

fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
Expand All @@ -133,7 +116,11 @@ impl EventLoop {
/// **NOTE** Don't block this while iterating
pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
if self.network.is_none() {
let (network, connack) = connect_or_cancel(&self.options, &self.cancel_rx).await?;
let (network, connack) = time::timeout(
Duration::from_secs(self.options.connection_timeout()),
connect(&self.options),
)
.await??;
self.network = Some(network);

if self.keepalive_timeout.is_none() {
Expand Down Expand Up @@ -225,24 +212,6 @@ impl EventLoop {
network.flush(&mut self.state.write).await?;
Ok(self.state.events.pop_front().unwrap())
}
// cancellation requests to stop the polling
_ = self.cancel_rx.recv_async() => {
Err(ConnectionError::Cancel)
}
}
}
}

async fn connect_or_cancel(
options: &MqttOptions,
cancel_rx: &Receiver<()>,
) -> Result<(Network, Incoming), ConnectionError> {
// select here prevents cancel request from being blocked until connection request is
// resolved. Returns with an error if connections fail continuously or is timedout.
select! {
o = time::timeout(Duration::from_secs(options.connection_timeout()), connect(options)) => o?,
_ = cancel_rx.recv_async() => {
Err(ConnectionError::Cancel)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
//! - Queue size based flow control on outgoing packets
//! - Automatic reconnections by just continuing the `eventloop.poll()`/`connection.iter()` loop
//! - Natural backpressure to client APIs during bad network
//! - Immediate cancellation with `client.cancel()`
//!
//! In short, everything necessary to maintain a robust connection
//!
Expand Down
7 changes: 0 additions & 7 deletions rumqttc/src/v5/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! async eventloop.
use crate::v5::{packet::*, ConnectionError, EventLoop, Request};

use flume::SendError;
use std::mem;
use tokio::runtime::{self, Runtime};

Expand All @@ -14,8 +13,6 @@ pub use syncclient::Client;
/// Client Error
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Failed to send cancel request to eventloop")]
Cancel(SendError<()>),
#[error("Failed to send mqtt request to eventloop, the evenloop has been closed")]
EventloopClosed,
#[error("Failed to send mqtt request to evenloop, to requests buffer is full right now")]
Expand Down Expand Up @@ -79,10 +76,6 @@ impl<'a> Iterator for Iter<'a> {
trace!("Done with requests");
None
}
Err(ConnectionError::Cancel) => {
trace!("Cancellation request received");
None
}
Err(e) => Some(Err(e)),
}
}
Expand Down
2 changes: 0 additions & 2 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub enum ConnectionError {
StreamDone,
#[error("Requests done")]
RequestsDone,
#[error("Cancel request by the user")]
Cancel,
#[error("MQTT connection has been disconnected")]
Disconnected,
}
Expand Down