Skip to content

Commit

Permalink
Separate router (#48)
Browse files Browse the repository at this point in the history
* Create router

* Improve documentation and set default throttle to 0

* Update version for release

Co-authored-by: tekjar <raviteja@bytebeam.io>
  • Loading branch information
Ravi Teja and tekjar authored Mar 17, 2020
1 parent 90e09e2 commit bb40a10
Show file tree
Hide file tree
Showing 18 changed files with 947 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
members = [
"rumq-core",
"rumq-client",
"rumq-router",
"rumq-broker",
"rumq-cli"
]
4 changes: 2 additions & 2 deletions rumq-broker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumq-broker"
version = "0.1.0-alpha.6"
version = "0.1.0-alpha.7"
description = "Library for embeddable mqtt broker"
license = "MIT"
repository = "https://github.com/tekjar/rumq"
Expand All @@ -14,7 +14,7 @@ tokio = { version = "0.2", features = ["full"] }
tokio-util = { version = "0.2", features = ["codec"] }
futures-util = { version = "0.3", features = ["sink"] }
tokio-rustls = "0.12"
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.6" }
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.7" }
derive_more = { version = "0.99", default-features = false, features = ["from"] }
log = "0.4"
serde = { version = "1", features = ["derive"] }
Expand Down
6 changes: 2 additions & 4 deletions rumq-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumq-client"
version = "0.1.0-alpha.7"
version = "0.1.0-alpha.8"
description = "An efficeint and robust mqtt client for your connected devices"
license = "MIT"
repository = "https://github.com/tekjar/rumq"
Expand All @@ -17,10 +17,8 @@ futures-util = { version = "0.3", features = ["sink"] }
async-stream = "0.2"
webpki = "0.21"
tokio-rustls = "0.12"
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.6" }
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.7" }
log = "0.4"
pin-project = "0.4"


[dev-dependencies]
pretty_env_logger = "0.3.1"
Expand Down
5 changes: 2 additions & 3 deletions rumq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ pub use eventloop::eventloop;
pub use eventloop::{EventLoopError, MqttEventLoop};
pub use state::MqttState;

#[doc(hidden)]
pub use rumq_core::mqtt4::{publish, subscribe, Connect, PacketIdentifier, Publish, QoS, Suback, Subscribe, Unsubscribe};
pub use rumq_core::mqtt4::*;

/// Includes incoming packets from the network and other interesting events happening in the eventloop
#[derive(Debug)]
Expand Down Expand Up @@ -287,7 +286,7 @@ impl MqttOptions {
max_packet_size: 256 * 1024,
request_channel_capacity: 10,
notification_channel_capacity: 10,
throttle: Duration::from_micros(10),
throttle: Duration::from_micros(0),
inflight: 100,
}
}
Expand Down
2 changes: 1 addition & 1 deletion rumq-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumq-core"
version = "0.1.0-alpha.6"
version = "0.1.0-alpha.7"
description = "Serializes and deserializes mqtt byte stream"
license = "MIT"
repository = "https://github.com/tekjar/rumq"
Expand Down
1 change: 1 addition & 0 deletions rumq-core/src/mqtt4/asyncdeserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::mqtt4::*;
use async_trait::async_trait;
use tokio::io::AsyncReadExt;

/// Mqtt awareness on top of tokio's `AsyncRead`
#[async_trait]
pub trait AsyncMqttRead: AsyncReadExt + Unpin {
async fn async_mqtt_read(&mut self) -> Result<Packet, Error> {
Expand Down
1 change: 1 addition & 0 deletions rumq-core/src/mqtt4/asyncserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::mqtt4::*;
use async_trait::async_trait;
use tokio::io::AsyncWriteExt;

/// Mqtt awareness on top of tokio's `AsyncWrite`
#[async_trait]
pub trait AsyncMqttWrite: AsyncWriteExt + Unpin {
async fn async_mqtt_write(&mut self, packet: &Packet) -> Result<(), Error> {
Expand Down
2 changes: 2 additions & 0 deletions rumq-core/src/mqtt4/codec.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! This module describes how to serialize and deserialize mqtt 4 packets
use bytes::buf::Buf;
use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
Expand All @@ -9,6 +10,7 @@ use crate::mqtt4::Packet;
use crate::mqtt4::{MqttRead, MqttWrite};
use crate::Error;

/// MqttCodec knows how to serialize and deserialize mqtt packets from a raw stream of bytes
pub struct MqttCodec {
max_payload_size: usize,
}
Expand Down
1 change: 1 addition & 0 deletions rumq-core/src/mqtt4/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::mqtt4::*;
use byteorder::ReadBytesExt;
use std::io::Read;

/// Mqtt awareness on top of `Read`
pub trait MqttRead: ReadBytesExt {
fn mqtt_read(&mut self) -> Result<Packet, Error> {
let packet_type = self.read_u8()?;
Expand Down
26 changes: 14 additions & 12 deletions rumq-core/src/mqtt4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use topic::*;

use crate::Error;

/// Quality of service
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub enum QoS {
Expand All @@ -26,7 +27,7 @@ pub enum QoS {

#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PacketType {
enum PacketType {
Connect = 1,
Connack,
Publish,
Expand All @@ -43,6 +44,7 @@ pub enum PacketType {
Disconnect,
}

/// Encapsulates all the possible mqtt packets
#[derive(Debug, Clone, PartialEq)]
pub enum Packet {
Connect(Connect),
Expand All @@ -61,16 +63,16 @@ pub enum Packet {
Disconnect,
}

/// 7 3 0
/// +--------------------------+--------------------------+
/// byte 1 | MQTT Control Packet Type | Flags for each type |
/// +--------------------------+--------------------------+
/// | Remaining Bytes Len (1 - 4 bytes) |
/// +-----------------------------------------------------+
///
/// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_2.2_-
// 7 3 0
// +--------------------------+--------------------------+
// byte 1 | MQTT Control Packet Type | Flags for each type |
// +--------------------------+--------------------------+
// | Remaining Bytes Len (1 - 4 bytes) |
// +-----------------------------------------------------+
//
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_2.2_-

pub fn qos(num: u8) -> Result<QoS, Error> {
fn qos(num: u8) -> Result<QoS, Error> {
match num {
0 => Ok(QoS::AtMostOnce),
1 => Ok(QoS::AtLeastOnce),
Expand All @@ -79,7 +81,7 @@ pub fn qos(num: u8) -> Result<QoS, Error> {
}
}

pub fn packet_type(num: u8) -> Result<PacketType, Error> {
fn packet_type(num: u8) -> Result<PacketType, Error> {
match num {
1 => Ok(PacketType::Connect),
2 => Ok(PacketType::Connack),
Expand All @@ -99,7 +101,7 @@ pub fn packet_type(num: u8) -> Result<PacketType, Error> {
}
}

pub fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
match num {
0 => Ok(ConnectReturnCode::Accepted),
1 => Ok(ConnectReturnCode::BadUsernamePassword),
Expand Down
23 changes: 22 additions & 1 deletion rumq-core/src/mqtt4/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ use derive_more::From;
use crate::mqtt4::QoS;
use std::fmt;

/// Packet identifier for packets types that require broker to acknowledge
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, From)]
pub struct PacketIdentifier(pub u16);

/// Mqtt protocol version
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
MQTT(u8),
}

/// Mqtt connect packet representation
#[derive(Clone, PartialEq)]
pub struct Connect {
/// Mqtt protocol version
Expand All @@ -29,6 +32,7 @@ pub struct Connect {
pub password: Option<String>,
}

/// Creates a new mqtt connect packet
pub fn connect<S: Into<String>>(id: S) -> Connect {
Connect {
protocol: Protocol::MQTT(4),
Expand All @@ -42,17 +46,19 @@ pub fn connect<S: Into<String>>(id: S) -> Connect {
}

impl Connect {
/// Sets username
pub fn set_username<S: Into<String>>(&mut self, u: S) -> &mut Connect {
self.username = Some(u.into());
self
}

/// Sets password
pub fn set_password<S: Into<String>>(&mut self, p: S) -> &mut Connect {
self.password = Some(p.into());
self
}

pub fn len(&self) -> usize {
pub(crate) fn len(&self) -> usize {
let mut len = 8 + "MQTT".len() + self.client_id.len();

// lastwill len
Expand All @@ -74,6 +80,7 @@ impl Connect {
}
}

/// Connection return code sent by the server
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(u8)]
pub enum ConnectReturnCode {
Expand All @@ -85,16 +92,19 @@ pub enum ConnectReturnCode {
NotAuthorized,
}

/// Connack packet
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Connack {
pub session_present: bool,
pub code: ConnectReturnCode,
}

/// Creates a new connack packet
pub fn connack(code: ConnectReturnCode, session_present: bool) -> Connack {
Connack { code, session_present }
}

/// Last will of the connection
#[derive(Debug, Clone, PartialEq)]
pub struct LastWill {
pub topic: String,
Expand All @@ -103,6 +113,7 @@ pub struct LastWill {
pub retain: bool,
}

/// Publish packet
#[derive(Clone, PartialEq)]
pub struct Publish {
pub dup: bool,
Expand All @@ -113,6 +124,7 @@ pub struct Publish {
pub payload: Vec<u8>,
}

/// Creates a new publish packet
pub fn publish<S: Into<String>, P: Into<Vec<u8>>>(topic: S, qos: QoS, payload: P) -> Publish {
Publish {
dup: false,
Expand All @@ -125,18 +137,21 @@ pub fn publish<S: Into<String>, P: Into<Vec<u8>>>(topic: S, qos: QoS, payload: P
}

impl Publish {
/// Sets packet identifier
pub fn set_pkid<P: Into<PacketIdentifier>>(&mut self, pkid: P) -> &mut Self {
self.pkid = Some(pkid.into());
self
}
}

/// Subscriber packet
#[derive(Clone, PartialEq)]
pub struct Subscribe {
pub pkid: PacketIdentifier,
pub topics: Vec<SubscribeTopic>,
}

/// Creates a new subscription packet
pub fn subscribe<S: Into<String>>(topic: S, qos: QoS) -> Subscribe {
let topic = SubscribeTopic {
topic_path: topic.into(),
Expand All @@ -149,6 +164,7 @@ pub fn subscribe<S: Into<String>>(topic: S, qos: QoS) -> Subscribe {
}
}

/// Creates an empty subscription packet
pub fn empty_subscribe() -> Subscribe {
Subscribe {
pkid: PacketIdentifier(0),
Expand All @@ -164,28 +180,33 @@ impl Subscribe {
}
}

/// Subscription topic
#[derive(Clone, PartialEq)]
pub struct SubscribeTopic {
pub topic_path: String,
pub qos: QoS,
}

/// Subscription return code sent by the broker
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubscribeReturnCodes {
Success(QoS),
Failure,
}

/// Subscription acknowledgement
#[derive(Debug, Clone, PartialEq)]
pub struct Suback {
pub pkid: PacketIdentifier,
pub return_codes: Vec<SubscribeReturnCodes>,
}

/// Creates a new subscription acknowledgement packet
pub fn suback(pkid: PacketIdentifier, return_codes: Vec<SubscribeReturnCodes>) -> Suback {
Suback { pkid, return_codes }
}

/// Unsubscribe packet
#[derive(Debug, Clone, PartialEq)]
pub struct Unsubscribe {
pub pkid: PacketIdentifier,
Expand Down
1 change: 1 addition & 0 deletions rumq-core/src/mqtt4/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::mqtt4::*;

use byteorder::WriteBytesExt;

/// Mqtt awareness on top of `Write`
pub trait MqttWrite: WriteBytesExt {
fn mqtt_write(&mut self, packet: &Packet) -> Result<(), Error> {
match packet {
Expand Down
14 changes: 7 additions & 7 deletions rumq-core/src/mqtt4/topic.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// checks if a topic or topic filter has wildcards
/// Checks if a topic or topic filter has wildcards
pub fn has_wildcards(s: &str) -> bool {
s.contains("+") || s.contains("#")
}

/// checks if a topic is valid
/// Checks if a topic is valid
pub fn valid_topic(topic: &str) -> bool {
if topic.contains("+") {
return false;
Expand All @@ -16,7 +16,7 @@ pub fn valid_topic(topic: &str) -> bool {
true
}

/// checks if the filter is valid
/// Checks if the filter is valid
/// https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106
pub fn valid_filter(filter: &str) -> bool {
if filter.len() == 0 {
Expand Down Expand Up @@ -44,10 +44,9 @@ pub fn valid_filter(filter: &str) -> bool {
true
}

/// checks if topic matches a filter. topic and filter validation isn't done here.
/// NOTE: 'topic' is a misnomer in the arg. this can also be used to match 2 wild subscriptions
/// NOTE: make sure a topic is validated during a publish and filter is validated
/// during a subscribe
/// Checks if topic matches a filter. topic and filter validation isn't done here.
/// **note** 'topic' is a misnomer in the arg. This can also be used to match 2 wild subscriptions.
/// **note** Make sure a topic is validated during a publish and filter is validated during a subscribe
pub fn matches(topic: &str, filter: &str) -> bool {
if topic.len() > 0 && topic[..1].contains("$") {
return false;
Expand Down Expand Up @@ -82,6 +81,7 @@ pub fn matches(topic: &str, filter: &str) -> bool {

true
}

#[cfg(test)]
mod test {
#[test]
Expand Down
13 changes: 13 additions & 0 deletions rumq-router/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "rumq-router"
version = "0.1.0-alpha.1"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", features = ["full"] }
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.6" }
derive_more = { version = "0.99", default-features = false, features = ["from"] }
log = "0.4"
Loading

0 comments on commit bb40a10

Please sign in to comment.