diff --git a/rumq-broker/design.md b/rumq-broker/design.md index ad952da5..9bb8b133 100644 --- a/rumq-broker/design.md +++ b/rumq-broker/design.md @@ -61,7 +61,7 @@ Disadvantages * Acks from/to router. Adds to processing. Microbatching can help here -Router full design +Shared router experiment ------------- * Router maintains all the state of connections @@ -95,6 +95,18 @@ single router. Prevents fragmentation * This inter router communication can be extended across network to make the broker distributed + +Findings: +------- + +* Need to separate mutable and immutable parts well +* Can't use tokio channel as sends need mutable borrows +* Even async std channels are a problem because the router can block + during `rc_router_tx.send()` causing Borrow mut panic while creating a + new connection +* Difficult to see Bandwidth of router and connections separately + + Distributed commit log ---------------- diff --git a/rumq-broker/src/connection.rs b/rumq-broker/src/connection.rs index ecaa54a2..83e251d4 100644 --- a/rumq-broker/src/connection.rs +++ b/rumq-broker/src/connection.rs @@ -124,7 +124,7 @@ impl Connection { // eventloop which processes packets and router messages let mut incoming = &mut self.stream; - let mut incoming = time::throttle(Duration::from_millis(0), &mut incoming); + let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming); loop { let mut timeout = time::delay_for(keep_alive); let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?; @@ -178,6 +178,12 @@ async fn select( } o = outgoing.next() => match o { Some(RouterMessage::Packet(packet)) => stream.get_mut().send(packet).await?, + Some(RouterMessage::Packets(packets)) => { + // TODO: Make these vectorized + for packet in packets.into_iter() { + stream.get_mut().send(packet).await? + } + } Some(message) => { warn!("Invalid router message = {:?}", message); return Ok((false, None)) diff --git a/rumq-broker/src/graveyard.rs b/rumq-broker/src/graveyard.rs deleted file mode 100644 index fb75976b..00000000 --- a/rumq-broker/src/graveyard.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; - -use crate::state::MqttState; - -use rumq_core::Packet; -use tokio::sync::mpsc::Sender; - -/// This is the place where all disconnection state is reaped. -/// For persistent connections,used to pick state back during reconnections. -/// Router uses this to pickup handle of a new connection -/// (instead of connection sending its handle to router during connection) -#[derive(Debug, Clone)] -pub struct Graveyard { - // handles to a connections. map of client id and connection channel tx - connection_handles: Arc>>>, - // state of disconnected persistent clients - connection_state: Arc>>, -} - -impl Graveyard { - pub fn new() -> Graveyard { - Graveyard { - connection_handles: Arc::new(Mutex::new(HashMap::new())), - connection_state: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub fn reap(&self, id: &str, state: MqttState) { - let mut handles = self.connection_state.lock().unwrap(); - handles.insert(id.to_owned(), state); - } -} diff --git a/rumq-broker/src/router.rs b/rumq-broker/src/router.rs index 8a647d77..35346bb3 100644 --- a/rumq-broker/src/router.rs +++ b/rumq-broker/src/router.rs @@ -2,6 +2,9 @@ use derive_more::From; use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::error::TrySendError; +use tokio::select; +use tokio::time::{self, Duration}; +use tokio::stream::StreamExt; use std::collections::{HashMap, VecDeque}; use std::mem; @@ -25,6 +28,8 @@ pub enum RouterMessage { Connect(Connection), /// Packet Packet(rumq_core::Packet), + /// Packets + Packets(VecDeque), /// Disconnects a client from active connections list. Will handling Death(String), /// Pending messages of the previous connection @@ -54,6 +59,7 @@ impl fmt::Debug for Connection { #[derive(Debug)] struct ActiveConnection { pub state: MqttState, + pub outgoing: VecDeque, tx: Sender } @@ -61,6 +67,7 @@ impl ActiveConnection { pub fn new(tx: Sender, state: MqttState) -> ActiveConnection { ActiveConnection { state, + outgoing: VecDeque::new(), tx } } @@ -124,30 +131,48 @@ impl Router { } pub async fn start(&mut self) -> Result<(), Error> { + let mut interval = time::interval(Duration::from_millis(10)); + loop { - let (id, mut message) = match self.data_rx.recv().await { - Some(message) => message, - None => return Err(Error::AllSendersDown), - }; + select! { + o = self.data_rx.recv() => { + let (id, mut message) = o.unwrap(); + match self.reply(id.clone(), &mut message) { + Ok(Some(message)) => self.forward(&id, message), + Ok(None) => (), + Err(e) => { + error!("Incoming handle error = {:?}", e); + continue; + } + } - // replys beack to the connection - match self.handle_incoming_router_message(id.clone(), &mut message) { - Ok(Some(message)) => self.forward(&id, message), - Ok(None) => (), - Err(e) => { - error!("Incoming handle error = {:?}", e); - continue; + // adds routes, routes the message to other connections etc etc + if let Err(e) = self.route(id, message) { + error!("Routing error = {:?}", e); + } + } + o = interval.next() => { + for (_, connection) in self.active_connections.iter_mut() { + let pending = connection.outgoing.split_off(0); + if pending.len() > 0 { + let _ = connection.tx.try_send(RouterMessage::Packets(pending)); + } + } } } - // route the message to other connections - if let Err(e) = self.route(id, message) { - error!("Routing error = {:?}", e); - } + } } - fn handle_incoming_router_message(&mut self, id: String, message: &mut RouterMessage) -> Result, Error> { + /// replys back to the connection sending the message + /// doesn't modify any routing information of the router + /// all the routing and route modifications due to subscriptions + /// are part of `route` method + /// generates reply to send backto the connection. Shouldn't touch anything except active and + /// inactive connections + /// No routing modifications here + fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result, Error> { debug!("Incoming router message. Id = {}, {:?}", id, message); match message { @@ -160,26 +185,27 @@ impl Router { let message = self.handle_incoming_packet(&id, packet.clone())?; Ok(message) } - RouterMessage::Death(id) => { - self.deactivate_and_forward_will(id.to_owned()); - Ok(None) - } - _ => unimplemented!() + _ => Ok(None) } } fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> { - if let RouterMessage::Packet(packet) = message { + match message { + RouterMessage::Packet(packet) => { debug!("Routing router message. Id = {}, {:?}", id, packet); match packet { - Packet::Publish(publish) => self.match_subscriptions_and_forward(&id, publish), + Packet::Publish(publish) => self.match_subscriptions(&id, publish), Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe), Packet::Unsubscribe(unsubscribe) => self.remove_from_subscriptions(id, unsubscribe), Packet::Disconnect => self.deactivate(id), _ => return Ok(()) } + } + RouterMessage::Death(id) => { + self.deactivate_and_forward_will(id.to_owned()); + } + _ => () } - Ok(()) } @@ -225,7 +251,7 @@ impl Router { Ok(reply) } - fn match_subscriptions_and_forward(&mut self, _id: &str, publish: Publish) { + fn match_subscriptions(&mut self, _id: &str, publish: Publish) { if publish.retain { if publish.payload.len() == 0 { self.retained_publishes.remove(&publish.topic_name); @@ -239,7 +265,7 @@ impl Router { if let Some(subscribers) = self.concrete_subscriptions.get(topic) { let subscribers = subscribers.clone(); for subscriber in subscribers.iter() { - self.forward_publish(subscriber, publish.clone()); + self.fill_subscriber(subscriber, publish.clone()); } } @@ -250,7 +276,7 @@ impl Router { if matches(&topic, &filter) { for subscriber in subscribers.into_iter() { let publish = publish.clone(); - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } }; @@ -300,13 +326,13 @@ impl Router { let retained_publishes = self.retained_publishes.clone(); for (topic, publish) in retained_publishes.into_iter() { if matches(&topic, &filter) { - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } } else { if let Some(publish) = self.retained_publishes.get(&filter) { let publish = publish.clone(); - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } } @@ -449,7 +475,7 @@ impl Router { let qos = will.qos; let publish = rumq_core::publish(topic, qos, message); - self.match_subscriptions_and_forward(&id, publish); + self.match_subscriptions(&id, publish); } if !connection.state.clean_session { @@ -484,7 +510,7 @@ impl Router { } // forwards data to the connection with the following id - fn forward_publish(&mut self, subscriber: &Subscriber, mut publish: Publish) { + fn fill_subscriber(&mut self, subscriber: &Subscriber, mut publish: Publish) { publish.qos = subscriber.qos; if let Some(connection) = self.inactive_connections.get_mut(&subscriber.client_id) { @@ -493,16 +519,10 @@ impl Router { return } - let message = if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) { + if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) { let packet = connection.state.handle_outgoing_publish(publish); - let message = RouterMessage::Packet(packet); - debug!("Forwarding publish to active connection. Id = {}, {:?}", subscriber.client_id, message); - message - } else { - return - }; - - self.forward(&subscriber.client_id, message); + connection.outgoing.push_back(packet); + } } fn forward(&mut self, id: &str, message: RouterMessage) {