diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f841b55..ff16da95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,24 @@ +### R5 +---------------- +mqttbytes v0.2.0 +----------- +- **feature** Complete mqtt 5 implementation +- **fix** Split mqtt 4 and 5 into modules [**breaking**] + +rumqttc v0.5.0 +----------- +- **changed** Update to mqttbytes 0.2 [**breaking**] + +rumqttd v0.4.0 +----------- +- **changed** Update to mqttbytes 0.2 [**breaking**] + +misc +---------- +- Add mqtt 4 and 5 parsers to benchmarking suite for comparsion + + + ### R4 ---------------- mqttbytes v0.1.0 diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 5a6ac37f..b7cb63ad 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "benchmarks" -version = "0.2.0" +version = "0.3.0" authors = ["tekjar "] edition = "2018" diff --git a/benchmarks/parsers/common.rs b/benchmarks/parsers/common.rs index 05b69182..e38bc497 100644 --- a/benchmarks/parsers/common.rs +++ b/benchmarks/parsers/common.rs @@ -21,6 +21,8 @@ pub(crate) struct Print { pub(crate) messages: usize, pub(crate) payload_size: usize, pub(crate) total_size_gb: f32, - pub(crate) write_throughput_gpbs: f32, - pub(crate) read_throughput_gpbs: f32, + pub(crate) v4_write_throughput_gpbs: f32, + pub(crate) v4_read_throughput_gpbs: f32, + pub(crate) v5_write_throughput_gpbs: f32, + pub(crate) v5_read_throughput_gpbs: f32, } diff --git a/benchmarks/parsers/mqttbytes.rs b/benchmarks/parsers/mqttbytes.rs index a08059b2..464d9b5c 100644 --- a/benchmarks/parsers/mqttbytes.rs +++ b/benchmarks/parsers/mqttbytes.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BytesMut}; -use mqttbytes::v4::{read, Publish}; +use mqttbytes::{v4, v5}; use mqttbytes::QoS; use std::time::Instant; @@ -13,11 +13,12 @@ fn main() { pretty_env_logger::init(); let count = 1024 * 1024; let payload_size = 1024; - let data = generate_data(count, payload_size); let guard = pprof::ProfilerGuard::new(100).unwrap(); + + // ------------------------- v4 write throughput ------------------------------- + let data = generate_v4_data(count, payload_size); let mut output = BytesMut::with_capacity(10 * 1024); - // ------------------------- write throughput ------------------------------- let start = Instant::now(); for publish in data.into_iter() { publish.write(&mut output).unwrap(); @@ -26,21 +27,49 @@ fn main() { let elapsed_micros = start.elapsed().as_micros(); let total_size = output.len(); let throughput = (total_size * 1000_000) / elapsed_micros as usize; - let write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; + let v4_write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; + + // --------------------------- v4 read throughput ------------------------------- + + let start = Instant::now(); + let mut packets = Vec::with_capacity(count); + while output.has_remaining() { + let packet = v4::read(&mut output, 10 * 1024).unwrap(); + packets.push(packet); + } + + let elapsed_micros = start.elapsed().as_micros(); + let throughput = (total_size * 1000_000) / elapsed_micros as usize; + let v4_read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; + + // ------------------------- v5 write throughput ------------------------------- + let data = generate_v5_data(count, payload_size); + let mut output = BytesMut::with_capacity(10 * 1024); + + let start = Instant::now(); + for publish in data.into_iter() { + publish.write(&mut output).unwrap(); + } + + let elapsed_micros = start.elapsed().as_micros(); + let total_size = output.len(); + let throughput = (total_size * 1000_000) / elapsed_micros as usize; + let v5_write_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; let total_size_gb = total_size as f32 / 1024.0 / 1024.0 / 1024.0; - // --------------------------- read throughput ------------------------------- + // --------------------------- v5 read throughput ------------------------------- let start = Instant::now(); let mut packets = Vec::with_capacity(count); while output.has_remaining() { - let packet = read(&mut output, 10 * 1024).unwrap(); + let packet = v5::read(&mut output, 10 * 1024).unwrap(); packets.push(packet); } let elapsed_micros = start.elapsed().as_micros(); let throughput = (total_size * 1000_000) / elapsed_micros as usize; - let read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; + let v5_read_throughput = throughput as f32 / 1024.0 / 1024.0 / 1024.0; + // --------------------------- results --------------------------------------- @@ -49,21 +78,34 @@ fn main() { messages: count, payload_size, total_size_gb, - write_throughput_gpbs: write_throughput, - read_throughput_gpbs: read_throughput, + v4_write_throughput_gpbs: v4_write_throughput, + v4_read_throughput_gpbs: v4_read_throughput, + v5_write_throughput_gpbs: v5_write_throughput, + v5_read_throughput_gpbs: v5_read_throughput }; println!("{}", serde_json::to_string_pretty(&print).unwrap()); common::profile("bench.pb", guard); } -fn generate_data(count: usize, payload_size: usize) -> Vec { +fn generate_v4_data(count: usize, payload_size: usize) -> Vec { let mut data = Vec::with_capacity(count); for i in 0..count { - let mut publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]); + let mut publish = v4::Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]); publish.pkid = (i % 100 + 1) as u16; data.push(publish); } data } + +fn generate_v5_data(count: usize, payload_size: usize) -> Vec { + let mut data = Vec::with_capacity(count); + for i in 0..count { + let mut publish = v5::Publish::new("hello/world", QoS::AtLeastOnce, vec![1; payload_size]); + publish.pkid = (i % 100 + 1) as u16; + data.push(publish); + } + + data +} \ No newline at end of file diff --git a/mqttbytes/Cargo.toml b/mqttbytes/Cargo.toml index cd8c0a37..7cdf3d9e 100644 --- a/mqttbytes/Cargo.toml +++ b/mqttbytes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqttbytes" -version = "0.1.0" +version = "0.2.0" readme = "README.md" authors = ["tekjar "] edition = "2018" diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index 0bf89dc5..122d8c63 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rumqttc" -version = "0.4.0" +version = "0.5.0" description = "An efficient and robust mqtt client for your connected devices" license = "Apache-2.0" repository = "https://github.com/bytebeamio/rumqtt" @@ -23,7 +23,7 @@ webpki = "0.21" tokio-rustls = "0.22" async-tungstenite = { version = "0.11.0", default-features = false, features = ["tokio-rustls"], optional = true } ws_stream_tungstenite = { version = "0.4.0", default-features = false, features = ["tokio_io"], optional = true } -mqttbytes = { path = "../mqttbytes", version = "0.1" } +mqttbytes = { path = "../mqttbytes", version = "0.2" } pollster = "0.2" async-channel = "1.5" log = "0.4" diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index a5162447..1ee5c240 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -2,7 +2,7 @@ name = "rumqttd" description = "Distributed, embeddable mqtt broker library" license = "Apache-2.0" -version = "0.3.0" +version = "0.4.0" authors = ["tekjar "] edition = "2018" keywords = ["mqtt", "broker", "iot", "kafka", "nats"] @@ -18,8 +18,8 @@ name = "rumqttd" path = "src/bin.rs" [dependencies] -rumqttlog = { path = "../rumqttlog", version = "0.4"} -mqttbytes = { path = "../mqttbytes", version = "0.1" } +rumqttlog = { path = "../rumqttlog", version = "0.5"} +mqttbytes = { path = "../mqttbytes", version = "0.2" } tokio = { version = "1.0", features = ["full"] } tokio-rustls = "0.22" tokio-compat-02 = "0.2" diff --git a/rumqttlog/Cargo.toml b/rumqttlog/Cargo.toml index 0dd019bb..bb557a24 100644 --- a/rumqttlog/Cargo.toml +++ b/rumqttlog/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rumqttlog" -version = "0.4.0" +version = "0.5.0" authors = ["tekjar "] edition = "2018" license = "Apache-2.0" @@ -11,7 +11,7 @@ description = "kafka inspired rumqtt's mqtt commitlog" byteorder = "1" memmap = "0.7" bytes = "1.0" -mqttbytes = { path = "../mqttbytes", version = "0.1" } +mqttbytes = { path = "../mqttbytes", version = "0.2" } serde = { version = "1", features = ["derive", "rc"] } segments = "0.1" thiserror = "1"