Skip to content

Commit

Permalink
Transition entirely to flume in rumqttc
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed May 13, 2022
1 parent 399c62b commit 233110c
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 20 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ rustls-pemfile = { version = "0.3.0", optional = true }
async-tungstenite = { version = "0.16.1", default-features = false, features = ["tokio-rustls-native-certs"], optional = true }
ws_stream_tungstenite = { version = "0.7.0", default-features = false, features = ["tokio_io"], optional = true }
pollster = "0.2"
async-channel = "1.5"
log = "0.4"
thiserror = "1.0.21"
http = "^0.2"
Expand Down
18 changes: 9 additions & 9 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::mqttbytes::{self, v4::*, QoS};
use crate::{ConnectionError, Event, EventLoop, MqttOptions, Request};

use async_channel::{SendError, Sender, TrySendError};
use flume::{SendError, Sender, TrySendError};
use bytes::Bytes;
use std::mem;
use tokio::runtime;
Expand Down Expand Up @@ -69,7 +69,7 @@ impl AsyncClient {
let mut publish = Publish::new(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.request_tx.send(publish).await?;
self.request_tx.send_async(publish).await?;
Ok(())
}

Expand Down Expand Up @@ -97,7 +97,7 @@ impl AsyncClient {
let ack = get_ack_req(publish);

if let Some(ack) = ack {
self.request_tx.send(ack).await?;
self.request_tx.send_async(ack).await?;
}
Ok(())
}
Expand Down Expand Up @@ -125,15 +125,15 @@ impl AsyncClient {
let mut publish = Publish::from_bytes(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.request_tx.send(publish).await?;
self.request_tx.send_async(publish).await?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.send(request).await?;
self.request_tx.send_async(request).await?;
Ok(())
}

Expand All @@ -152,7 +152,7 @@ impl AsyncClient {
{
let subscribe = Subscribe::new_many(topics)?;
let request = Request::Subscribe(subscribe);
self.request_tx.send(request).await?;
self.request_tx.send_async(request).await?;
Ok(())
}

Expand All @@ -171,7 +171,7 @@ impl AsyncClient {
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.send(request).await?;
self.request_tx.send_async(request).await?;
Ok(())
}

Expand All @@ -186,7 +186,7 @@ impl AsyncClient {
/// Sends a MQTT disconnect to the eventloop
pub async fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.send(request).await?;
self.request_tx.send_async(request).await?;
Ok(())
}

Expand All @@ -199,7 +199,7 @@ impl AsyncClient {

/// Stops the eventloop right away
pub async fn cancel(&self) -> Result<(), ClientError> {
self.cancel_tx.send(()).await?;
self.cancel_tx.send_async(()).await?;
Ok(())
}
}
Expand Down
8 changes: 4 additions & 4 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::tls;
use crate::{Incoming, MqttOptions, MqttState, Outgoing, Packet, Request, StateError, Transport};

use crate::mqttbytes::v4::*;
use async_channel::{bounded, Receiver, Sender};
use flume::{bounded, Receiver, Sender};
#[cfg(feature = "websocket")]
use async_tungstenite::tokio::connect_async;
#[cfg(all(feature = "use-rustls", feature = "websocket"))]
Expand Down Expand Up @@ -200,7 +200,7 @@ impl EventLoop {
// After collision with pkid 1 -> [1b ,2, x, 4, 5].
// 1a is saved to state and event loop is set to collision mode stopping new
// outgoing requests (along with 1b).
o = self.requests_rx.recv(), if !inflight_full && !pending && !collision => match o {
o = self.requests_rx.recv_async(), if !inflight_full && !pending && !collision => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
network.flush(&mut self.state.write).await?;
Expand All @@ -226,7 +226,7 @@ impl EventLoop {
Ok(self.state.events.pop_front().unwrap())
}
// cancellation requests to stop the polling
_ = self.cancel_rx.recv() => {
_ = self.cancel_rx.recv_async() => {
Err(ConnectionError::Cancel)
}
}
Expand All @@ -241,7 +241,7 @@ async fn connect_or_cancel(
// resolved. Returns with an error if connections fail continuously
select! {
o = connect(options) => o,
_ = cancel_rx.recv() => {
_ = cancel_rx.recv_async() => {
Err(ConnectionError::Cancel)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod state;
mod tls;
pub mod v5;

pub use async_channel::{SendError, Sender, TrySendError};
pub use flume::{SendError, Sender, TrySendError};
pub use client::{AsyncClient, Client, ClientError, Connection};
pub use eventloop::{ConnectionError, Event, EventLoop};
pub use mqttbytes::v4::*;
Expand Down
6 changes: 3 additions & 3 deletions rumqttc/tests/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::net::TcpListener;
use tokio::select;
use tokio::{task, time};

use async_channel::{bounded, Receiver, Sender};
use flume::{bounded, Receiver, Sender};
use bytes::BytesMut;
use rumqttc::{Event, Incoming, Outgoing, Packet};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Broker {
}

let packet = Packet::Publish(publish);
tx.send(packet).await.unwrap();
tx.send_async(packet).await.unwrap();
time::sleep(Duration::from_secs(delay)).await;
}
});
Expand All @@ -145,7 +145,7 @@ impl Broker {
/// Selects between outgoing and incoming packets
pub async fn tick(&mut self) -> Event {
select! {
request = self.outgoing_rx.recv() => {
request = self.outgoing_rx.recv_async() => {
let request = request.unwrap();
let outgoing = self.framed.write(request).await.unwrap();
Event::Outgoing(outgoing)
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn start_requests(count: u8, qos: QoS, delay: u64, requests_tx: Sender<Req

let publish = Publish::new(topic, qos, payload);
let request = Request::Publish(publish);
let _ = requests_tx.send(request).await;
let _ = requests_tx.send_async(request).await;
time::sleep(Duration::from_secs(delay)).await;
}
}
Expand Down

0 comments on commit 233110c

Please sign in to comment.