diff --git a/Cargo.lock b/Cargo.lock index 142b86ca..ff987eb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1697,7 +1697,6 @@ dependencies = [ name = "rumqttc" version = "0.12.0" dependencies = [ - "async-channel", "async-tungstenite", "bytes 1.1.0", "color-backtrace", diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index aa157ab7..3717c74b 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -26,7 +26,6 @@ rustls-pemfile = { version = "0.3.0", optional = true } async-tungstenite = { version = "0.16.1", default-features = false, features = ["tokio-rustls-native-certs"], optional = true } ws_stream_tungstenite = { version = "0.7.0", default-features = false, features = ["tokio_io"], optional = true } pollster = "0.2" -async-channel = "1.5" log = "0.4" thiserror = "1.0.21" http = "^0.2" diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index a8c769a7..ba73c00a 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -3,7 +3,7 @@ use crate::mqttbytes::{self, v4::*, QoS}; use crate::{ConnectionError, Event, EventLoop, MqttOptions, Request}; -use async_channel::{SendError, Sender, TrySendError}; +use flume::{SendError, Sender, TrySendError}; use bytes::Bytes; use std::mem; use tokio::runtime; @@ -69,7 +69,7 @@ impl AsyncClient { let mut publish = Publish::new(topic, qos, payload); publish.retain = retain; let publish = Request::Publish(publish); - self.request_tx.send(publish).await?; + self.request_tx.send_async(publish).await?; Ok(()) } @@ -97,7 +97,7 @@ impl AsyncClient { let ack = get_ack_req(publish); if let Some(ack) = ack { - self.request_tx.send(ack).await?; + self.request_tx.send_async(ack).await?; } Ok(()) } @@ -125,7 +125,7 @@ impl AsyncClient { let mut publish = Publish::from_bytes(topic, qos, payload); publish.retain = retain; let publish = Request::Publish(publish); - self.request_tx.send(publish).await?; + self.request_tx.send_async(publish).await?; Ok(()) } @@ -133,7 +133,7 @@ impl AsyncClient { pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { let subscribe = Subscribe::new(topic.into(), qos); let request = Request::Subscribe(subscribe); - self.request_tx.send(request).await?; + self.request_tx.send_async(request).await?; Ok(()) } @@ -152,7 +152,7 @@ impl AsyncClient { { let subscribe = Subscribe::new_many(topics)?; let request = Request::Subscribe(subscribe); - self.request_tx.send(request).await?; + self.request_tx.send_async(request).await?; Ok(()) } @@ -171,7 +171,7 @@ impl AsyncClient { pub async fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { let unsubscribe = Unsubscribe::new(topic.into()); let request = Request::Unsubscribe(unsubscribe); - self.request_tx.send(request).await?; + self.request_tx.send_async(request).await?; Ok(()) } @@ -186,7 +186,7 @@ impl AsyncClient { /// Sends a MQTT disconnect to the eventloop pub async fn disconnect(&self) -> Result<(), ClientError> { let request = Request::Disconnect; - self.request_tx.send(request).await?; + self.request_tx.send_async(request).await?; Ok(()) } @@ -199,7 +199,7 @@ impl AsyncClient { /// Stops the eventloop right away pub async fn cancel(&self) -> Result<(), ClientError> { - self.cancel_tx.send(()).await?; + self.cancel_tx.send_async(()).await?; Ok(()) } } diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index e65619b3..cbb683f9 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -4,7 +4,7 @@ use crate::tls; use crate::{Incoming, MqttOptions, MqttState, Outgoing, Packet, Request, StateError, Transport}; use crate::mqttbytes::v4::*; -use async_channel::{bounded, Receiver, Sender}; +use flume::{bounded, Receiver, Sender}; #[cfg(feature = "websocket")] use async_tungstenite::tokio::connect_async; #[cfg(all(feature = "use-rustls", feature = "websocket"))] @@ -200,7 +200,7 @@ impl EventLoop { // After collision with pkid 1 -> [1b ,2, x, 4, 5]. // 1a is saved to state and event loop is set to collision mode stopping new // outgoing requests (along with 1b). - o = self.requests_rx.recv(), if !inflight_full && !pending && !collision => match o { + o = self.requests_rx.recv_async(), if !inflight_full && !pending && !collision => match o { Ok(request) => { self.state.handle_outgoing_packet(request)?; network.flush(&mut self.state.write).await?; @@ -226,7 +226,7 @@ impl EventLoop { Ok(self.state.events.pop_front().unwrap()) } // cancellation requests to stop the polling - _ = self.cancel_rx.recv() => { + _ = self.cancel_rx.recv_async() => { Err(ConnectionError::Cancel) } } @@ -241,7 +241,7 @@ async fn connect_or_cancel( // resolved. Returns with an error if connections fail continuously select! { o = connect(options) => o, - _ = cancel_rx.recv() => { + _ = cancel_rx.recv_async() => { Err(ConnectionError::Cancel) } } diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index d1cf4af6..d47154cf 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -113,7 +113,7 @@ mod state; mod tls; pub mod v5; -pub use async_channel::{SendError, Sender, TrySendError}; +pub use flume::{SendError, Sender, TrySendError}; pub use client::{AsyncClient, Client, ClientError, Connection}; pub use eventloop::{ConnectionError, Event, EventLoop}; pub use mqttbytes::v4::*; diff --git a/rumqttc/tests/broker.rs b/rumqttc/tests/broker.rs index dc788c0e..1f8c9c18 100644 --- a/rumqttc/tests/broker.rs +++ b/rumqttc/tests/broker.rs @@ -7,7 +7,7 @@ use tokio::net::TcpListener; use tokio::select; use tokio::{task, time}; -use async_channel::{bounded, Receiver, Sender}; +use flume::{bounded, Receiver, Sender}; use bytes::BytesMut; use rumqttc::{Event, Incoming, Outgoing, Packet}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -136,7 +136,7 @@ impl Broker { } let packet = Packet::Publish(publish); - tx.send(packet).await.unwrap(); + tx.send_async(packet).await.unwrap(); time::sleep(Duration::from_secs(delay)).await; } }); @@ -145,7 +145,7 @@ impl Broker { /// Selects between outgoing and incoming packets pub async fn tick(&mut self) -> Event { select! { - request = self.outgoing_rx.recv() => { + request = self.outgoing_rx.recv_async() => { let request = request.unwrap(); let outgoing = self.framed.write(request).await.unwrap(); Event::Outgoing(outgoing) diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index feb78b9c..55e7b80a 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -14,7 +14,7 @@ async fn start_requests(count: u8, qos: QoS, delay: u64, requests_tx: Sender