From c81a5b9629aaa4f30b7599efe66970f530106b19 Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 10 Feb 2020 17:39:06 +0530 Subject: [PATCH 1/2] Move deactivation into route --- rumq-broker/design.md | 14 +++++++++++++- rumq-broker/src/graveyard.rs | 33 --------------------------------- rumq-broker/src/router.rs | 31 +++++++++++++++++++------------ 3 files changed, 32 insertions(+), 46 deletions(-) delete mode 100644 rumq-broker/src/graveyard.rs diff --git a/rumq-broker/design.md b/rumq-broker/design.md index ad952da5..9bb8b133 100644 --- a/rumq-broker/design.md +++ b/rumq-broker/design.md @@ -61,7 +61,7 @@ Disadvantages * Acks from/to router. Adds to processing. Microbatching can help here -Router full design +Shared router experiment ------------- * Router maintains all the state of connections @@ -95,6 +95,18 @@ single router. Prevents fragmentation * This inter router communication can be extended across network to make the broker distributed + +Findings: +------- + +* Need to separate mutable and immutable parts well +* Can't use tokio channel as sends need mutable borrows +* Even async std channels are a problem because the router can block + during `rc_router_tx.send()` causing Borrow mut panic while creating a + new connection +* Difficult to see Bandwidth of router and connections separately + + Distributed commit log ---------------- diff --git a/rumq-broker/src/graveyard.rs b/rumq-broker/src/graveyard.rs deleted file mode 100644 index fb75976b..00000000 --- a/rumq-broker/src/graveyard.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; - -use crate::state::MqttState; - -use rumq_core::Packet; -use tokio::sync::mpsc::Sender; - -/// This is the place where all disconnection state is reaped. -/// For persistent connections,used to pick state back during reconnections. -/// Router uses this to pickup handle of a new connection -/// (instead of connection sending its handle to router during connection) -#[derive(Debug, Clone)] -pub struct Graveyard { - // handles to a connections. map of client id and connection channel tx - connection_handles: Arc>>>, - // state of disconnected persistent clients - connection_state: Arc>>, -} - -impl Graveyard { - pub fn new() -> Graveyard { - Graveyard { - connection_handles: Arc::new(Mutex::new(HashMap::new())), - connection_state: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub fn reap(&self, id: &str, state: MqttState) { - let mut handles = self.connection_state.lock().unwrap(); - handles.insert(id.to_owned(), state); - } -} diff --git a/rumq-broker/src/router.rs b/rumq-broker/src/router.rs index 8a647d77..ee13fee0 100644 --- a/rumq-broker/src/router.rs +++ b/rumq-broker/src/router.rs @@ -130,8 +130,11 @@ impl Router { None => return Err(Error::AllSendersDown), }; - // replys beack to the connection - match self.handle_incoming_router_message(id.clone(), &mut message) { + // replys back to the connection sending the message + // doesn't modify any routing information of the router + // all the routing and route modifications due to subscriptions + // are part of `route` method + match self.reply(id.clone(), &mut message) { Ok(Some(message)) => self.forward(&id, message), Ok(None) => (), Err(e) => { @@ -140,14 +143,17 @@ impl Router { } } - // route the message to other connections + // adds routes, routes the message to other connections etc etc if let Err(e) = self.route(id, message) { - error!("Routing error = {:?}", e); + error!("Routing error = {:?}", e); } } } - fn handle_incoming_router_message(&mut self, id: String, message: &mut RouterMessage) -> Result, Error> { + /// generates reply to send backto the connection. Shouldn't touch anything except active and + /// inactive connections + /// No routing modifications here + fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result, Error> { debug!("Incoming router message. Id = {}, {:?}", id, message); match message { @@ -160,16 +166,13 @@ impl Router { let message = self.handle_incoming_packet(&id, packet.clone())?; Ok(message) } - RouterMessage::Death(id) => { - self.deactivate_and_forward_will(id.to_owned()); - Ok(None) - } - _ => unimplemented!() + _ => Ok(None) } } fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> { - if let RouterMessage::Packet(packet) = message { + match message { + RouterMessage::Packet(packet) => { debug!("Routing router message. Id = {}, {:?}", id, packet); match packet { Packet::Publish(publish) => self.match_subscriptions_and_forward(&id, publish), @@ -178,8 +181,12 @@ impl Router { Packet::Disconnect => self.deactivate(id), _ => return Ok(()) } + } + RouterMessage::Death(id) => { + self.deactivate_and_forward_will(id.to_owned()); + } + _ => () } - Ok(()) } From 8db199fcb31e476632fd4bab313d239da7222294 Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 10 Feb 2020 19:16:14 +0530 Subject: [PATCH 2/2] Feature to send bulk messages from the router --- rumq-broker/src/connection.rs | 8 +++- rumq-broker/src/router.rs | 83 ++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 36 deletions(-) diff --git a/rumq-broker/src/connection.rs b/rumq-broker/src/connection.rs index ecaa54a2..83e251d4 100644 --- a/rumq-broker/src/connection.rs +++ b/rumq-broker/src/connection.rs @@ -124,7 +124,7 @@ impl Connection { // eventloop which processes packets and router messages let mut incoming = &mut self.stream; - let mut incoming = time::throttle(Duration::from_millis(0), &mut incoming); + let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming); loop { let mut timeout = time::delay_for(keep_alive); let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?; @@ -178,6 +178,12 @@ async fn select( } o = outgoing.next() => match o { Some(RouterMessage::Packet(packet)) => stream.get_mut().send(packet).await?, + Some(RouterMessage::Packets(packets)) => { + // TODO: Make these vectorized + for packet in packets.into_iter() { + stream.get_mut().send(packet).await? + } + } Some(message) => { warn!("Invalid router message = {:?}", message); return Ok((false, None)) diff --git a/rumq-broker/src/router.rs b/rumq-broker/src/router.rs index ee13fee0..35346bb3 100644 --- a/rumq-broker/src/router.rs +++ b/rumq-broker/src/router.rs @@ -2,6 +2,9 @@ use derive_more::From; use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::error::TrySendError; +use tokio::select; +use tokio::time::{self, Duration}; +use tokio::stream::StreamExt; use std::collections::{HashMap, VecDeque}; use std::mem; @@ -25,6 +28,8 @@ pub enum RouterMessage { Connect(Connection), /// Packet Packet(rumq_core::Packet), + /// Packets + Packets(VecDeque), /// Disconnects a client from active connections list. Will handling Death(String), /// Pending messages of the previous connection @@ -54,6 +59,7 @@ impl fmt::Debug for Connection { #[derive(Debug)] struct ActiveConnection { pub state: MqttState, + pub outgoing: VecDeque, tx: Sender } @@ -61,6 +67,7 @@ impl ActiveConnection { pub fn new(tx: Sender, state: MqttState) -> ActiveConnection { ActiveConnection { state, + outgoing: VecDeque::new(), tx } } @@ -124,32 +131,44 @@ impl Router { } pub async fn start(&mut self) -> Result<(), Error> { + let mut interval = time::interval(Duration::from_millis(10)); + loop { - let (id, mut message) = match self.data_rx.recv().await { - Some(message) => message, - None => return Err(Error::AllSendersDown), - }; + select! { + o = self.data_rx.recv() => { + let (id, mut message) = o.unwrap(); + match self.reply(id.clone(), &mut message) { + Ok(Some(message)) => self.forward(&id, message), + Ok(None) => (), + Err(e) => { + error!("Incoming handle error = {:?}", e); + continue; + } + } - // replys back to the connection sending the message - // doesn't modify any routing information of the router - // all the routing and route modifications due to subscriptions - // are part of `route` method - match self.reply(id.clone(), &mut message) { - Ok(Some(message)) => self.forward(&id, message), - Ok(None) => (), - Err(e) => { - error!("Incoming handle error = {:?}", e); - continue; + // adds routes, routes the message to other connections etc etc + if let Err(e) = self.route(id, message) { + error!("Routing error = {:?}", e); + } + } + o = interval.next() => { + for (_, connection) in self.active_connections.iter_mut() { + let pending = connection.outgoing.split_off(0); + if pending.len() > 0 { + let _ = connection.tx.try_send(RouterMessage::Packets(pending)); + } + } } } - // adds routes, routes the message to other connections etc etc - if let Err(e) = self.route(id, message) { - error!("Routing error = {:?}", e); - } + } } + /// replys back to the connection sending the message + /// doesn't modify any routing information of the router + /// all the routing and route modifications due to subscriptions + /// are part of `route` method /// generates reply to send backto the connection. Shouldn't touch anything except active and /// inactive connections /// No routing modifications here @@ -175,7 +194,7 @@ impl Router { RouterMessage::Packet(packet) => { debug!("Routing router message. Id = {}, {:?}", id, packet); match packet { - Packet::Publish(publish) => self.match_subscriptions_and_forward(&id, publish), + Packet::Publish(publish) => self.match_subscriptions(&id, publish), Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe), Packet::Unsubscribe(unsubscribe) => self.remove_from_subscriptions(id, unsubscribe), Packet::Disconnect => self.deactivate(id), @@ -232,7 +251,7 @@ impl Router { Ok(reply) } - fn match_subscriptions_and_forward(&mut self, _id: &str, publish: Publish) { + fn match_subscriptions(&mut self, _id: &str, publish: Publish) { if publish.retain { if publish.payload.len() == 0 { self.retained_publishes.remove(&publish.topic_name); @@ -246,7 +265,7 @@ impl Router { if let Some(subscribers) = self.concrete_subscriptions.get(topic) { let subscribers = subscribers.clone(); for subscriber in subscribers.iter() { - self.forward_publish(subscriber, publish.clone()); + self.fill_subscriber(subscriber, publish.clone()); } } @@ -257,7 +276,7 @@ impl Router { if matches(&topic, &filter) { for subscriber in subscribers.into_iter() { let publish = publish.clone(); - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } }; @@ -307,13 +326,13 @@ impl Router { let retained_publishes = self.retained_publishes.clone(); for (topic, publish) in retained_publishes.into_iter() { if matches(&topic, &filter) { - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } } else { if let Some(publish) = self.retained_publishes.get(&filter) { let publish = publish.clone(); - self.forward_publish(&subscriber, publish); + self.fill_subscriber(&subscriber, publish); } } } @@ -456,7 +475,7 @@ impl Router { let qos = will.qos; let publish = rumq_core::publish(topic, qos, message); - self.match_subscriptions_and_forward(&id, publish); + self.match_subscriptions(&id, publish); } if !connection.state.clean_session { @@ -491,7 +510,7 @@ impl Router { } // forwards data to the connection with the following id - fn forward_publish(&mut self, subscriber: &Subscriber, mut publish: Publish) { + fn fill_subscriber(&mut self, subscriber: &Subscriber, mut publish: Publish) { publish.qos = subscriber.qos; if let Some(connection) = self.inactive_connections.get_mut(&subscriber.client_id) { @@ -500,16 +519,10 @@ impl Router { return } - let message = if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) { + if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) { let packet = connection.state.handle_outgoing_publish(publish); - let message = RouterMessage::Packet(packet); - debug!("Forwarding publish to active connection. Id = {}, {:?}", subscriber.client_id, message); - message - } else { - return - }; - - self.forward(&subscriber.client_id, message); + connection.outgoing.push_back(packet); + } } fn forward(&mut self, id: &str, message: RouterMessage) {