Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk forwards #32

Merged
merged 2 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion rumq-broker/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Disadvantages
* Acks from/to router. Adds to processing. Microbatching can help here


Router full design
Shared router experiment
-------------

* Router maintains all the state of connections
Expand Down Expand Up @@ -95,6 +95,18 @@ single router. Prevents fragmentation
* This inter router communication can be extended across network to make
the broker distributed


Findings:
-------

* Need to separate mutable and immutable parts well
* Can't use tokio channel as sends need mutable borrows
* Even async std channels are a problem because the router can block
during `rc_router_tx.send()` causing Borrow mut panic while creating a
new connection
* Difficult to see Bandwidth of router and connections separately


Distributed commit log
----------------

Expand Down
8 changes: 7 additions & 1 deletion rumq-broker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<S: Network> Connection<S> {

// eventloop which processes packets and router messages
let mut incoming = &mut self.stream;
let mut incoming = time::throttle(Duration::from_millis(0), &mut incoming);
let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming);
loop {
let mut timeout = time::delay_for(keep_alive);
let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?;
Expand Down Expand Up @@ -178,6 +178,12 @@ async fn select<S: Network>(
}
o = outgoing.next() => match o {
Some(RouterMessage::Packet(packet)) => stream.get_mut().send(packet).await?,
Some(RouterMessage::Packets(packets)) => {
// TODO: Make these vectorized
for packet in packets.into_iter() {
stream.get_mut().send(packet).await?
}
}
Some(message) => {
warn!("Invalid router message = {:?}", message);
return Ok((false, None))
Expand Down
33 changes: 0 additions & 33 deletions rumq-broker/src/graveyard.rs

This file was deleted.

100 changes: 60 additions & 40 deletions rumq-broker/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use derive_more::From;
use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::error::TrySendError;
use tokio::select;
use tokio::time::{self, Duration};
use tokio::stream::StreamExt;

use std::collections::{HashMap, VecDeque};
use std::mem;
Expand All @@ -25,6 +28,8 @@ pub enum RouterMessage {
Connect(Connection),
/// Packet
Packet(rumq_core::Packet),
/// Packets
Packets(VecDeque<rumq_core::Packet>),
/// Disconnects a client from active connections list. Will handling
Death(String),
/// Pending messages of the previous connection
Expand Down Expand Up @@ -54,13 +59,15 @@ impl fmt::Debug for Connection {
#[derive(Debug)]
struct ActiveConnection {
pub state: MqttState,
pub outgoing: VecDeque<rumq_core::Packet>,
tx: Sender<RouterMessage>
}

impl ActiveConnection {
pub fn new(tx: Sender<RouterMessage>, state: MqttState) -> ActiveConnection {
ActiveConnection {
state,
outgoing: VecDeque::new(),
tx
}
}
Expand Down Expand Up @@ -124,30 +131,48 @@ impl Router {
}

pub async fn start(&mut self) -> Result<(), Error> {
let mut interval = time::interval(Duration::from_millis(10));

loop {
let (id, mut message) = match self.data_rx.recv().await {
Some(message) => message,
None => return Err(Error::AllSendersDown),
};
select! {
o = self.data_rx.recv() => {
let (id, mut message) = o.unwrap();
match self.reply(id.clone(), &mut message) {
Ok(Some(message)) => self.forward(&id, message),
Ok(None) => (),
Err(e) => {
error!("Incoming handle error = {:?}", e);
continue;
}
}

// replys beack to the connection
match self.handle_incoming_router_message(id.clone(), &mut message) {
Ok(Some(message)) => self.forward(&id, message),
Ok(None) => (),
Err(e) => {
error!("Incoming handle error = {:?}", e);
continue;
// adds routes, routes the message to other connections etc etc
if let Err(e) = self.route(id, message) {
error!("Routing error = {:?}", e);
}
}
o = interval.next() => {
for (_, connection) in self.active_connections.iter_mut() {
let pending = connection.outgoing.split_off(0);
if pending.len() > 0 {
let _ = connection.tx.try_send(RouterMessage::Packets(pending));
}
}
}
}

// route the message to other connections
if let Err(e) = self.route(id, message) {
error!("Routing error = {:?}", e);
}

}
}

fn handle_incoming_router_message(&mut self, id: String, message: &mut RouterMessage) -> Result<Option<RouterMessage>, Error> {
/// replys back to the connection sending the message
/// doesn't modify any routing information of the router
/// all the routing and route modifications due to subscriptions
/// are part of `route` method
/// generates reply to send backto the connection. Shouldn't touch anything except active and
/// inactive connections
/// No routing modifications here
fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result<Option<RouterMessage>, Error> {
debug!("Incoming router message. Id = {}, {:?}", id, message);

match message {
Expand All @@ -160,26 +185,27 @@ impl Router {
let message = self.handle_incoming_packet(&id, packet.clone())?;
Ok(message)
}
RouterMessage::Death(id) => {
self.deactivate_and_forward_will(id.to_owned());
Ok(None)
}
_ => unimplemented!()
_ => Ok(None)
}
}

fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> {
if let RouterMessage::Packet(packet) = message {
match message {
RouterMessage::Packet(packet) => {
debug!("Routing router message. Id = {}, {:?}", id, packet);
match packet {
Packet::Publish(publish) => self.match_subscriptions_and_forward(&id, publish),
Packet::Publish(publish) => self.match_subscriptions(&id, publish),
Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe),
Packet::Unsubscribe(unsubscribe) => self.remove_from_subscriptions(id, unsubscribe),
Packet::Disconnect => self.deactivate(id),
_ => return Ok(())
}
}
RouterMessage::Death(id) => {
self.deactivate_and_forward_will(id.to_owned());
}
_ => ()
}

Ok(())
}

Expand Down Expand Up @@ -225,7 +251,7 @@ impl Router {
Ok(reply)
}

fn match_subscriptions_and_forward(&mut self, _id: &str, publish: Publish) {
fn match_subscriptions(&mut self, _id: &str, publish: Publish) {
if publish.retain {
if publish.payload.len() == 0 {
self.retained_publishes.remove(&publish.topic_name);
Expand All @@ -239,7 +265,7 @@ impl Router {
if let Some(subscribers) = self.concrete_subscriptions.get(topic) {
let subscribers = subscribers.clone();
for subscriber in subscribers.iter() {
self.forward_publish(subscriber, publish.clone());
self.fill_subscriber(subscriber, publish.clone());
}
}

Expand All @@ -250,7 +276,7 @@ impl Router {
if matches(&topic, &filter) {
for subscriber in subscribers.into_iter() {
let publish = publish.clone();
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
};
Expand Down Expand Up @@ -300,13 +326,13 @@ impl Router {
let retained_publishes = self.retained_publishes.clone();
for (topic, publish) in retained_publishes.into_iter() {
if matches(&topic, &filter) {
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
} else {
if let Some(publish) = self.retained_publishes.get(&filter) {
let publish = publish.clone();
self.forward_publish(&subscriber, publish);
self.fill_subscriber(&subscriber, publish);
}
}
}
Expand Down Expand Up @@ -449,7 +475,7 @@ impl Router {
let qos = will.qos;

let publish = rumq_core::publish(topic, qos, message);
self.match_subscriptions_and_forward(&id, publish);
self.match_subscriptions(&id, publish);
}

if !connection.state.clean_session {
Expand Down Expand Up @@ -484,7 +510,7 @@ impl Router {
}

// forwards data to the connection with the following id
fn forward_publish(&mut self, subscriber: &Subscriber, mut publish: Publish) {
fn fill_subscriber(&mut self, subscriber: &Subscriber, mut publish: Publish) {
publish.qos = subscriber.qos;

if let Some(connection) = self.inactive_connections.get_mut(&subscriber.client_id) {
Expand All @@ -493,16 +519,10 @@ impl Router {
return
}

let message = if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) {
if let Some(connection) = self.active_connections.get_mut(&subscriber.client_id) {
let packet = connection.state.handle_outgoing_publish(publish);
let message = RouterMessage::Packet(packet);
debug!("Forwarding publish to active connection. Id = {}, {:?}", subscriber.client_id, message);
message
} else {
return
};

self.forward(&subscriber.client_id, message);
connection.outgoing.push_back(packet);
}
}

fn forward(&mut self, id: &str, message: RouterMessage) {
Expand Down