Skip to content

Commit

Permalink
Bulk forwards (#32)
Browse files Browse the repository at this point in the history
* Move deactivation into route

* Feature to send bulk messages from the router
  • Loading branch information
Ravi Teja authored Feb 10, 2020
1 parent 922b0f8 commit 30725be
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 75 deletions.
14 changes: 13 additions & 1 deletion rumq-broker/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Disadvantages
* Acks from/to router. Adds to processing. Microbatching can help here


Router full design
Shared router experiment
-------------

* Router maintains all the state of connections
Expand Down Expand Up @@ -95,6 +95,18 @@ single router. Prevents fragmentation
* This inter router communication can be extended across network to make
the broker distributed


Findings:
-------

* Need to separate mutable and immutable parts well
* Can't use tokio channel as sends need mutable borrows
* Even async std channels are a problem because the router can block
during `rc_router_tx.send()` causing Borrow mut panic while creating a
new connection
* Difficult to see Bandwidth of router and connections separately


Distributed commit log
----------------

Expand Down
8 changes: 7 additions & 1 deletion rumq-broker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<S: Network> Connection<S> {

// eventloop which processes packets and router messages
let mut incoming = &mut self.stream;
let mut incoming = time::throttle(Duration::from_millis(0), &mut incoming);
let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming);
loop {
let mut timeout = time::delay_for(keep_alive);
let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?;
Expand Down Expand Up @@ -178,6 +178,12 @@ async fn select<S: Network>(
}
o = outgoing.next() => match o {
Some(RouterMessage::Packet(packet)) => stream.get_mut().send(packet).await?,
Some(RouterMessage::Packets(packets)) => {
// TODO: Make these vectorized
for packet in packets.into_iter() {
stream.get_mut().send(packet).await?
}
}
Some(message) => {
warn!("Invalid router message = {:?}", message);
return Ok((false, None))
Expand Down
33 changes: 0 additions & 33 deletions rumq-broker/src/graveyard.rs

This file was deleted.

100 changes: 60 additions & 40 deletions rumq-broker/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use derive_more::From;
use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::error::TrySendError;
use tokio::select;
use tokio::time::{self, Duration};
use tokio::stream::StreamExt;

use std::collections::{HashMap, VecDeque};
use std::mem;
Expand All @@ -25,6 +28,8 @@ pub enum RouterMessage {
Connect(Connection),
/// Packet
Packet(rumq_core::Packet),
/// Packets
Packets(VecDeque<rumq_core::Packet>),
/// Disconnects a client from active connections list. Will handling
Death(String),
/// Pending messages of the previous connection
Expand Down Expand Up @@ -54,13 +59,15 @@ impl fmt::Debug for Connection {
#[derive(Debug)]
struct ActiveConnection {
pub state: MqttState,
pub outgoing: VecDeque<rumq_core::Packet>,
tx: Sender<RouterMessage>
}

impl ActiveConnection {
pub fn new(tx: Sender<RouterMessage>, state: MqttState) -> ActiveConnection {
ActiveConnection {
state,
outgoing: VecDeque::new(),
tx
}
}
Expand Down Expand Up @@ -124,30 +131,48 @@ impl Router {
}

pub async fn start(&mut self) -> Result<(), Error> {
let mut interval = time::interval(Duration::from_millis(10));

loop {
let (id, mut message) = match self.data_rx.recv().await {
Some(message) => message,
None => return Err(Error::AllSendersDown),
};
select! {
o = self.data_rx.recv() => {
let (id, mut message) = o.unwrap();
match self.reply(id.clone(), &mut message) {
Ok(Some(message)) => self.forward(&id, message),
Ok(None) => (),
Err(e) => {
error!("Incoming handle error = {:?}", e);
continue;
}
}

// replys beack to the connection
match self.handle_incoming_router_message(id.clone(), &mut message) {
Ok(Some(message)) => self.forward(&id, message),
Ok(None) => (),
Err(e) => {
error!("Incoming handle error = {:?}", e);
continue;
// adds routes, routes the message to other connections etc etc
if let Err(e) = self.route(id, message) {
error!("Routing error = {:?}", e);
}
}
o = interval.next() => {
for (_, connection) in self.active_connections.iter_mut() {
let pending = connection.outgoing.split_off(0);
if pending.len() > 0 {
let _ = connection.tx.try_send(RouterMessage::Packets(pending));
}
}
}
}

// route the message to other connections
if let Err(e) = self.route(id, message) {
error!("Routing error = {:?}", e);
}

}
}

fn handle_incoming_router_message(&mut self, id: String, message: &mut RouterMessage) -> Result<Option<RouterMessage>, Error> {
/// replys back to the connection sending the message
/// doesn't modify any routing information of the router
/// all the routing and route modifications due to subscriptions
/// are part of `route` method
/// generates reply to send backto the connection. Shouldn't touch anything except active and
/// inactive connections
/// No routing modifications here
fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result<Option<RouterMessage>, Error> {
debug!("Incoming router message. Id = {}, {:?}", id, message);

match message {
Expand All @@ -160,26 +185,27 @@ impl Router {
let message = self.handle_incoming_packet(&id, packet.clone())?;
Ok(message)
}
RouterMessage::Death(id) => {
self.deactivate_and_forward_will(id.to_owned());
Ok(None)
}
_ => unimplemented!()
_ => Ok(None)
}
}

fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> {
if let RouterMessage::Packet(packet) = message {
match message {
RouterMessage::Packet(packet) => {
debug!("Routing router message. Id = {}, {:?}", id, packet);
match packet {
Packet::Publish(publish) => self.match_subscriptions_and_forward(&id, publish),
Packet::Publish(publish) => self.match_subscriptions(&id, publish),
Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe),
Packet::Unsubscribe(unsubscribe) => self.remove_from_subscriptions(id, unsubscribe),
Packet::Disconnect => self.deactivate(id),
_ => return Ok(())
}
}
RouterMessage::Death(id) => {
self.deactivate_and_forward_will(id.to_owned());
}
_ => ()
}

Ok(())
}

Expand Down Expand Up @@ -225,7 +251,7 @@ impl Router {
Ok(reply)
}

fn match_subscriptions_and_forward(&mut self, _id: &str, publish: Publish) {
fn match_subscriptions(&mut self, _id: &str, publish: Publish) {
if publish.retain {
if publish.payload.len() == 0 {
self.retained_publishes.remove(&publish.topic_name);
Expand All @@ -239,7 +265,7 @@ impl Router {
if let Some(subscribers) = self.concrete_subscriptions.get(topic) {
let subscribers = subscribers.clone();
for subscriber in subscribers.iter() {
self.forward_publish(subscriber, publish.clone());
self.fill_subscriber(subscriber, publish.clone());
}
}

Expand All @@ -250,7 +276,7 @@ impl Router {
if matches(&topic, &filter) {
for subscriber in subscribers.into_iter() {
let publish = publish.clone();
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
};
Expand Down Expand Up @@ -300,13 +326,13 @@ impl Router {
let retained_publishes = self.retained_publishes.clone();
for (topic, publish) in retained_publishes.into_iter() {
if matches(&topic, &filter) {
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
} else {
if let Some(publish) = self.retained_publishes.get(&filter) {
let publish = publish.clone();
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
}
Expand Down Expand Up @@ -449,7 +475,7 @@ impl Router {
let qos = will.qos;

let publish = rumq_core::publish(topic, qos, message);
self.match_subscriptions_and_forward(&id, publish);
self.match_subscriptions(&id, publish);
}

if !connection.state.clean_session {
Expand Down Expand Up @@ -484,7 +510,7 @@ impl Router {
}

// forwards data to the connection with the following id
fn forward_publish(&mut self, subscriber: &Subscriber, mut publish: Publish) {
fn fill_subscriber(&mut self, subscriber: &Subscriber, mut publish: Publish) {
publish.qos = subscriber.qos;

if let Some(connection) = self.inactive_connections.get_mut(&subscriber.client_id) {
Expand All @@ -493,16 +519,10 @@ impl Router {
return
}

let message = if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) {
if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) {
let packet = connection.state.handle_outgoing_publish(publish);
let message = RouterMessage::Packet(packet);
debug!("Forwarding publish to active connection. Id = {}, {:?}", subscriber.client_id, message);
message
} else {
return
};

self.forward(&subscriber.client_id, message);
connection.outgoing.push_back(packet);
}
}

fn forward(&mut self, id: &str, message: RouterMessage) {
Expand Down

0 comments on commit 30725be

Please sign in to comment.