diff --git a/rumqttc/examples/subscription_ids.rs b/rumqttc/examples/subscription_ids.rs new file mode 100644 index 00000000..0c1a014e --- /dev/null +++ b/rumqttc/examples/subscription_ids.rs @@ -0,0 +1,82 @@ +use rumqttc::v5::mqttbytes::v5::SubscribeProperties; +use rumqttc::v5::mqttbytes::QoS; +use tokio::{task, time}; + +use rumqttc::v5::{AsyncClient, MqttOptions}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(worker_threads = 1)] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + time::sleep(Duration::from_secs(3)).await; + }); + + while let Ok(event) = eventloop.poll().await { + println!("{:?}", event); + } + + Ok(()) +} + +async fn requests(client: AsyncClient) { + let props = SubscribeProperties { + id: Some(1), + user_properties: vec![], + }; + + client + .subscribe_with_properties("hello/world", QoS::AtMostOnce, props) + .await + .unwrap(); + + let props = SubscribeProperties { + id: Some(2), + user_properties: vec![], + }; + + client + .subscribe_with_properties("hello/#", QoS::AtMostOnce, props) + .await + .unwrap(); + + time::sleep(Duration::from_millis(500)).await; + // we will receive two publishes + // one due to hello/world and other due to hello/# + // both will have respective subscription ids + client + .publish( + "hello/world", + QoS::AtMostOnce, + false, + "both having subscription IDs!", + ) + .await + .unwrap(); + + time::sleep(Duration::from_millis(500)).await; + client.unsubscribe("hello/#").await.unwrap(); + client.subscribe("hello/#", QoS::AtMostOnce).await.unwrap(); + time::sleep(Duration::from_millis(500)).await; + + // we will receive two publishes + // but only one will have subscription ID + // cuz we unsubscribed to hello/# and then + // subscribed without properties! + client + .publish( + "hello/world", + QoS::AtMostOnce, + false, + "Only one with subscription ID!", + ) + .await + .unwrap(); +} diff --git a/rumqttd/CHANGELOG.md b/rumqttd/CHANGELOG.md index 90f2cf80..cc1118d5 100644 --- a/rumqttd/CHANGELOG.md +++ b/rumqttd/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Subscription IDs in v5 publish packets (#632) ### Changed diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index e0201686..cccebad4 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -28,6 +28,8 @@ pub struct Connection { pub(crate) topic_aliases: HashMap, /// Topic aliases used by broker pub(crate) broker_topic_aliases: Option, + /// subscription IDs for a connection + pub(crate) subscription_ids: HashMap, } impl Connection { @@ -68,6 +70,7 @@ impl Connection { events: ConnectionEvents::default(), topic_aliases: HashMap::new(), broker_topic_aliases, + subscription_ids: HashMap::new(), } } } diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 9fa28ef2..6901bab3 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -20,7 +20,6 @@ use thiserror::Error; use tracing::{debug, error, info, trace, warn}; use super::alertlog::{Alert, AlertLog}; -use super::connection::BrokerAliases; use super::graveyard::Graveyard; use super::iobufs::{Incoming, Outgoing}; use super::logs::{AckLog, DataLog}; @@ -615,7 +614,7 @@ impl Router { ); }; } - Packet::Subscribe(subscribe, _) => { + Packet::Subscribe(subscribe, props) => { let mut return_codes = Vec::new(); let pkid = subscribe.pkid; // let len = s.len(); @@ -637,9 +636,24 @@ impl Router { let filter = &f.path; let qos = f.qos; + let subscription_id = props.as_ref().and_then(|p| p.id); + + if subscription_id == Some(0) { + error!("Subscription identifier can't be 0"); + disconnect = true; + disconnect_reason = Some(DisconnectReasonCode::ProtocolError); + break; + } let (idx, cursor) = self.datalog.next_native_offset(filter); - self.prepare_filter(id, cursor, idx, filter.clone(), qos as u8); + self.prepare_filter( + id, + cursor, + idx, + filter.clone(), + qos as u8, + subscription_id, + ); self.datalog .handle_retained_messages(filter, &mut self.notifications); @@ -689,6 +703,9 @@ impl Router { broker_aliases.remove_alias(filter); } + // remove the subscription id + connection.subscription_ids.remove(filter); + let unsuback = UnsubAck { pkid, // reasons are used in MQTTv5 @@ -856,6 +873,7 @@ impl Router { filter_idx: FilterIdx, filter: String, qos: u8, + subscription_id: Option, ) { // Add connection id to subscription list match self.subscription_map.get_mut(&filter) { @@ -872,6 +890,12 @@ impl Router { // Prepare consumer to pull data in case of subscription let connection = self.connections.get_mut(id).unwrap(); + if let Some(subscription_id) = subscription_id { + connection + .subscription_ids + .insert(filter.clone(), subscription_id); + } + if connection.subscriptions.insert(filter.clone()) { let request = DataRequest { filter: filter.clone(), @@ -919,7 +943,6 @@ impl Router { ack_device_data(ackslog, outgoing); let connection = &mut self.connections[id]; - let broker_topic_aliases = &mut connection.broker_topic_aliases; // A new connection's tracker is always initialized with acks request. // A subscribe will register data request. @@ -937,13 +960,7 @@ impl Router { } }; - match forward_device_data( - &mut request, - datalog, - outgoing, - alertlog, - broker_topic_aliases, - ) { + match forward_device_data(&mut request, datalog, outgoing, alertlog, connection) { ConsumeStatus::BufferFull => { requests.push_back(request); self.scheduler.pause(id, PauseReason::Busy); @@ -1068,6 +1085,17 @@ fn append_to_commitlog( p.topic_alias.take() }); + // TODO: broker should properly send the disconnect packet! + if properties + .as_ref() + .is_some_and(|p| !p.subscription_identifiers.is_empty()) + { + error!("A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier"); + return Err(RouterError::Disconnect( + DisconnectReasonCode::MalformedPacket, + )); + } + if let Some(alias) = topic_alias { validate_and_set_topic_alias(&mut publish, connection, alias)?; }; @@ -1210,7 +1238,7 @@ fn forward_device_data( datalog: &DataLog, outgoing: &mut Outgoing, alertlog: &mut AlertLog, - broker_topic_aliases: &mut Option, + connection: &mut Connection, ) -> ConsumeStatus { let span = tracing::info_span!("outgoing_publish", client_id = outgoing.client_id); let _guard = span.enter(); @@ -1282,6 +1310,7 @@ fn forward_device_data( return ConsumeStatus::FilterCaughtup; } + let broker_topic_aliases = &mut connection.broker_topic_aliases; let mut topic_alias = broker_topic_aliases .as_ref() .and_then(|aliases| aliases.get_alias(&request.filter)); @@ -1295,6 +1324,8 @@ fn forward_device_data( .and_then(|broker_aliases| broker_aliases.set_new_alias(&request.filter)) } + let subscription_id = connection.subscription_ids.get(&request.filter); + // Fill and notify device data let forwards = publishes .into_iter() @@ -1313,6 +1344,13 @@ fn forward_device_data( publish.topic.clear() } + if let Some(&subscription_id) = subscription_id { + // create new props if not already exists + let mut props = properties.unwrap_or_default(); + props.subscription_identifiers.push(subscription_id); + properties = Some(props); + } + Forward { cursor: offset, size: 0,