Skip to content

Commit

Permalink
R5 (#231)
Browse files Browse the repository at this point in the history
* Update parser benchmarks

* Update versions and changelog

* Fix dependency versions

Co-authored-by: tekjar <raviteja@bytebeam.io>
  • Loading branch information
Ravi Teja and tekjar authored Jan 19, 2021
1 parent 63c5997 commit 2ebb875
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 22 deletions.
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
### R5
----------------
mqttbytes v0.2.0
-----------
- **feature** Complete mqtt 5 implementation
- **fix** Split mqtt 4 and 5 into modules [**breaking**]

rumqttc v0.5.0
-----------
- **changed** Update to mqttbytes 0.2 [**breaking**]

rumqttd v0.4.0
-----------
- **changed** Update to mqttbytes 0.2 [**breaking**]

misc
----------
- Add mqtt 4 and 5 parsers to benchmarking suite for comparsion



### R4
----------------
mqttbytes v0.1.0
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "benchmarks"
version = "0.2.0"
version = "0.3.0"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2018"

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/parsers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub(crate) struct Print {
pub(crate) messages: usize,
pub(crate) payload_size: usize,
pub(crate) total_size_gb: f32,
pub(crate) write_throughput_gpbs: f32,
pub(crate) read_throughput_gpbs: f32,
pub(crate) v4_write_throughput_gpbs: f32,
pub(crate) v4_read_throughput_gpbs: f32,
pub(crate) v5_write_throughput_gpbs: f32,
pub(crate) v5_read_throughput_gpbs: f32,
}
64 changes: 53 additions & 11 deletions benchmarks/parsers/mqttbytes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BytesMut};
use mqttbytes::v4::{read, Publish};
use mqttbytes::{v4, v5};
use mqttbytes::QoS;
use std::time::Instant;

Expand All @@ -13,11 +13,12 @@ fn main() {
pretty_env_logger::init();
let count = 1024 * 1024;
let payload_size = 1024;
let data = generate_data(count, payload_size);
let guard = pprof::ProfilerGuard::new(100).unwrap();

// ------------------------- v4 write throughput -------------------------------
let data = generate_v4_data(count, payload_size);
let mut output = BytesMut::with_capacity(10 * 1024);

// ------------------------- write throughput -------------------------------
let start = Instant::now();
for publish in data.into_iter() {
publish.write(&mut output).unwrap();
Expand All @@ -26,21 +27,49 @@ fn main() {
let elapsed_micros = start.elapsed().as_micros();
let total_size = output.len();
let throughput = (total_size * 1000_000) / elapsed_micros as usize;
let write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;
let v4_write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;

// --------------------------- v4 read throughput -------------------------------

let start = Instant::now();
let mut packets = Vec::with_capacity(count);
while output.has_remaining() {
let packet = v4::read(&mut output, 10 * 1024).unwrap();
packets.push(packet);
}

let elapsed_micros = start.elapsed().as_micros();
let throughput = (total_size * 1000_000) / elapsed_micros as usize;
let v4_read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;

// ------------------------- v5 write throughput -------------------------------
let data = generate_v5_data(count, payload_size);
let mut output = BytesMut::with_capacity(10 * 1024);

let start = Instant::now();
for publish in data.into_iter() {
publish.write(&mut output).unwrap();
}

let elapsed_micros = start.elapsed().as_micros();
let total_size = output.len();
let throughput = (total_size * 1000_000) / elapsed_micros as usize;
let v5_write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;
let total_size_gb = total_size as f32 / 1024.0 / 1024.0 / 1024.0;

// --------------------------- read throughput -------------------------------
// --------------------------- v5 read throughput -------------------------------

let start = Instant::now();
let mut packets = Vec::with_capacity(count);
while output.has_remaining() {
let packet = read(&mut output, 10 * 1024).unwrap();
let packet = v5::read(&mut output, 10 * 1024).unwrap();
packets.push(packet);
}

let elapsed_micros = start.elapsed().as_micros();
let throughput = (total_size * 1000_000) / elapsed_micros as usize;
let read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;
let v5_read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0;


// --------------------------- results ---------------------------------------

Expand All @@ -49,21 +78,34 @@ fn main() {
messages: count,
payload_size,
total_size_gb,
write_throughput_gpbs: write_throughput,
read_throughput_gpbs: read_throughput,
v4_write_throughput_gpbs: v4_write_throughput,
v4_read_throughput_gpbs: v4_read_throughput,
v5_write_throughput_gpbs: v5_write_throughput,
v5_read_throughput_gpbs: v5_read_throughput
};

println!("{}", serde_json::to_string_pretty(&print).unwrap());
common::profile("bench.pb", guard);
}

fn generate_data(count: usize, payload_size: usize) -> Vec<Publish> {
fn generate_v4_data(count: usize, payload_size: usize) -> Vec<v4::Publish> {
let mut data = Vec::with_capacity(count);
for i in 0..count {
let mut publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]);
let mut publish = v4::Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]);
publish.pkid = (i % 100 + 1) as u16;
data.push(publish);
}

data
}

fn generate_v5_data(count: usize, payload_size: usize) -> Vec<v5::Publish> {
let mut data = Vec::with_capacity(count);
for i in 0..count {
let mut publish = v5::Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]);
publish.pkid = (i % 100 + 1) as u16;
data.push(publish);
}

data
}
2 changes: 1 addition & 1 deletion mqttbytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqttbytes"
version = "0.1.0"
version = "0.2.0"
readme = "README.md"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2018"
Expand Down
4 changes: 2 additions & 2 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumqttc"
version = "0.4.0"
version = "0.5.0"
description = "An efficient and robust mqtt client for your connected devices"
license = "Apache-2.0"
repository = "https://github.com/bytebeamio/rumqtt"
Expand All @@ -23,7 +23,7 @@ webpki = "0.21"
tokio-rustls = "0.22"
async-tungstenite = { version = "0.11.0", default-features = false, features = ["tokio-rustls"], optional = true }
ws_stream_tungstenite = { version = "0.4.0", default-features = false, features = ["tokio_io"], optional = true }
mqttbytes = { path = "../mqttbytes", version = "0.1" }
mqttbytes = { path = "../mqttbytes", version = "0.2" }
pollster = "0.2"
async-channel = "1.5"
log = "0.4"
Expand Down
6 changes: 3 additions & 3 deletions rumqttd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "rumqttd"
description = "Distributed, embeddable mqtt broker library"
license = "Apache-2.0"
version = "0.3.0"
version = "0.4.0"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2018"
keywords = ["mqtt", "broker", "iot", "kafka", "nats"]
Expand All @@ -18,8 +18,8 @@ name = "rumqttd"
path = "src/bin.rs"

[dependencies]
rumqttlog = { path = "../rumqttlog", version = "0.4"}
mqttbytes = { path = "../mqttbytes", version = "0.1" }
rumqttlog = { path = "../rumqttlog", version = "0.5"}
mqttbytes = { path = "../mqttbytes", version = "0.2" }
tokio = { version = "1.0", features = ["full"] }
tokio-rustls = "0.22"
tokio-compat-02 = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions rumqttlog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumqttlog"
version = "0.4.0"
version = "0.5.0"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2018"
license = "Apache-2.0"
Expand All @@ -11,7 +11,7 @@ description = "kafka inspired rumqtt's mqtt commitlog"
byteorder = "1"
memmap = "0.7"
bytes = "1.0"
mqttbytes = { path = "../mqttbytes", version = "0.1" }
mqttbytes = { path = "../mqttbytes", version = "0.2" }
serde = { version = "1", features = ["derive", "rc"] }
segments = "0.1"
thiserror = "1"
Expand Down

0 comments on commit 2ebb875

Please sign in to comment.