diff --git a/rumqttd/CHANGELOG.md b/rumqttd/CHANGELOG.md index b7e0fad4..36354e95 100644 --- a/rumqttd/CHANGELOG.md +++ b/rumqttd/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Support for topic alias and message expiry in v5 ### Changed - Certificate paths configured in config file are checked during startup and throws a panic if it is not valid. diff --git a/rumqttd/src/link/bridge.rs b/rumqttd/src/link/bridge.rs index 1167d1f6..a7c33f62 100644 --- a/rumqttd/src/link/bridge.rs +++ b/rumqttd/src/link/bridge.rs @@ -59,7 +59,7 @@ where "Starting bridge with subscription on filter \"{}\"", &config.sub_path, ); - let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true)?; + let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true, None)?; 'outer: loop { let mut network = match network_connect(&config, &config.addr, protocol.clone()).await { diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index d905862b..67addc4e 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -23,7 +23,8 @@ impl ConsoleLink { /// Requires the corresponding Router to be running to complete pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink { let tx = router_tx.clone(); - let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap(); + let (link_tx, link_rx, _ack) = + Link::new(None, "console", tx, true, None, true, None).unwrap(); let connection_id = link_tx.connection_id; ConsoleLink { config, diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index d3e47929..de130dab 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -45,6 +45,7 @@ impl Link { clean: bool, last_will: Option, dynamic_filters: bool, + topic_alias_max: u16, ) -> ( Event, Arc>>, @@ -57,6 +58,7 @@ impl Link { clean, last_will, dynamic_filters, + topic_alias_max, ); let incoming = Incoming::new(connection.client_id.to_owned()); let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned()); @@ -80,12 +82,19 @@ impl Link { clean: bool, last_will: Option, dynamic_filters: bool, + topic_alias_max: Option, ) -> Result<(LinkTx, LinkRx, Notification), LinkError> { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx) = - Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); + let (message, i, o, link_rx) = Link::prepare( + tenant_id, + client_id, + clean, + last_will, + dynamic_filters, + topic_alias_max.unwrap_or(0), + ); router_tx.send((0, message))?; link_rx.recv()?; @@ -110,12 +119,19 @@ impl Link { clean: bool, last_will: Option, dynamic_filters: bool, + topic_alias_max: Option, ) -> Result<(LinkTx, LinkRx, ConnAck), LinkError> { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx) = - Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); + let (message, i, o, link_rx) = Link::prepare( + tenant_id, + client_id, + clean, + last_will, + dynamic_filters, + topic_alias_max.unwrap_or(0), + ); router_tx.send_async((0, message)).await?; link_rx.recv_async().await?; @@ -123,7 +139,7 @@ impl Link { // Right now link identifies failure with dropped rx in router, // which is probably ok. We need this here to get id assigned by router let (id, ack) = match notification { - Notification::DeviceAck(Ack::ConnAck(id, ack)) => (id, ack), + Notification::DeviceAck(Ack::ConnAck(id, ack, _)) => (id, ack), _message => return Err(LinkError::NotConnectionAck), }; diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index 9f2e2086..666b0cc7 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -73,12 +73,12 @@ impl RemoteLink

{ }) .await??; - let (connect, lastwill, login) = match packet { - Packet::Connect(connect, _, _lastwill, _, login) => { + let (connect, props, lastwill, login) = match packet { + Packet::Connect(connect, props, _lastwill, _, login) => { Span::current().record("client_id", &connect.client_id); // Ignore last will - (connect, None, login) + (connect, props, None, login) } packet => return Err(Error::NotConnectPacket(packet)), }; @@ -119,6 +119,8 @@ impl RemoteLink

{ return Err(Error::InvalidClientId); } + let topic_alias_max = props.and_then(|p| p.topic_alias_max); + let (link_tx, link_rx, notification) = Link::new( tenant_id, &client_id, @@ -126,7 +128,9 @@ impl RemoteLink

{ clean_session, lastwill, dynamic_filters, + topic_alias_max, )?; + let id = link_rx.id(); Span::current().record("connection_id", id); diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index 09daf791..6aaa25f1 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -78,6 +78,7 @@ impl ShadowLink { true, None, config.dynamic_filters, + None, )?; let connection_id = link_rx.id(); diff --git a/rumqttd/src/protocol/mod.rs b/rumqttd/src/protocol/mod.rs index e1b3111e..4473b4aa 100644 --- a/rumqttd/src/protocol/mod.rs +++ b/rumqttd/src/protocol/mod.rs @@ -146,7 +146,7 @@ pub struct ConnAck { pub code: ConnectReturnCode, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct ConnAckProperties { pub session_expiry_interval: Option, pub receive_max: Option, @@ -246,7 +246,7 @@ impl Publish { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct PublishProperties { pub payload_format_indicator: Option, pub message_expiry_interval: Option, diff --git a/rumqttd/src/protocol/v4/mod.rs b/rumqttd/src/protocol/v4/mod.rs index 60d88820..aa454663 100644 --- a/rumqttd/src/protocol/v4/mod.rs +++ b/rumqttd/src/protocol/v4/mod.rs @@ -345,7 +345,10 @@ impl Protocol for V4 { Packet::Connect(connect, None, last_will, None, login) => { connect::write(&connect, &login, &last_will, buffer)? } - Packet::ConnAck(connack, None) => connack::write(&connack, buffer)?, + // TODO: set ConnAckProperties conditionally based on version + // currently we can't conditionally set them based on v5 or v4, + // so we ignore them, as properties can't be there in v4. + Packet::ConnAck(connack, _) => connack::write(&connack, buffer)?, Packet::Publish(publish, None) => publish::write(&publish, buffer)?, Packet::PubAck(puback, None) => puback::write(&puback, buffer)?, Packet::Subscribe(subscribe, None) => subscribe::write(&subscribe, buffer)?, diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index 419c03a8..e0201686 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -1,6 +1,8 @@ -use crate::protocol::LastWill; +use slab::Slab; + use crate::Filter; -use std::collections::HashSet; +use crate::{protocol::LastWill, Topic}; +use std::collections::{HashMap, HashSet}; use super::ConnectionEvents; @@ -22,6 +24,10 @@ pub struct Connection { pub last_will: Option, /// Connection events pub events: ConnectionEvents, + /// Topic aliases set by clients + pub(crate) topic_aliases: HashMap, + /// Topic aliases used by broker + pub(crate) broker_topic_aliases: Option, } impl Connection { @@ -32,6 +38,7 @@ impl Connection { clean: bool, last_will: Option, dynamic_filters: bool, + topic_alias_max: u16, ) -> Connection { // Change client id to -> tenant_id.client_id and derive topic path prefix // to validate topics @@ -44,6 +51,13 @@ impl Connection { None => (client_id, None), }; + // if topic_alias_max is 0, that means client doesn't want to use / support topic alias + let broker_topic_aliases = if topic_alias_max == 0 { + None + } else { + Some(BrokerAliases::new(topic_alias_max)) + }; + Connection { client_id, tenant_prefix, @@ -52,6 +66,61 @@ impl Connection { subscriptions: HashSet::default(), last_will, events: ConnectionEvents::default(), + topic_aliases: HashMap::new(), + broker_topic_aliases, + } + } +} + +#[derive(Debug)] +pub(crate) struct BrokerAliases { + pub(crate) broker_topic_aliases: HashMap, + pub(crate) used_aliases: Slab<()>, + pub(crate) topic_alias_max: u16, +} + +impl BrokerAliases { + fn new(topic_alias_max: u16) -> BrokerAliases { + let mut used_aliases = Slab::new(); + // occupy 0th index as 0 is invalid topic alias + assert_eq!(0, used_aliases.insert(())); + + let broker_topic_aliases = HashMap::new(); + + BrokerAliases { + broker_topic_aliases, + used_aliases, + topic_alias_max, + } + } + + // unset / remove the alias for topic + pub fn remove_alias(&mut self, topic: &str) { + if let Some(alias) = self.broker_topic_aliases.remove(topic) { + self.used_aliases.remove(alias as usize); } } + + // Get alias used for the topic, if it exists + pub fn get_alias(&self, topic: &str) -> Option { + self.broker_topic_aliases.get(topic).copied() + } + + // Set new alias for a topic and return the alias + // returns None if can't set new alias + pub fn set_new_alias(&mut self, topic: &str) -> Option { + let alias_to_use = self.used_aliases.insert(()); + + // NOTE: maybe we can use self.used_aliases.len() + // to check for availability of alias + if alias_to_use > self.topic_alias_max as usize { + self.used_aliases.remove(alias_to_use); + return None; + } + + let alias_to_use = alias_to_use as u16; + self.broker_topic_aliases + .insert(topic.to_owned(), alias_to_use); + Some(alias_to_use) + } } diff --git a/rumqttd/src/router/logs.rs b/rumqttd/src/router/logs.rs index efb76e36..c9799d7d 100644 --- a/rumqttd/src/router/logs.rs +++ b/rumqttd/src/router/logs.rs @@ -3,7 +3,8 @@ use slab::Slab; use tracing::trace; use crate::protocol::{ - matches, ConnAck, PingResp, PubAck, PubComp, PubRec, PubRel, Publish, SubAck, UnsubAck, + matches, ConnAck, ConnAckProperties, PingResp, PubAck, PubComp, PubRec, PubRel, Publish, + PublishProperties, SubAck, UnsubAck, }; use crate::router::{DataRequest, FilterIdx, SubscriptionMeter, Waiters}; use crate::{ConnectionId, Filter, Offset, RouterConfig, Topic}; @@ -12,6 +13,35 @@ use crate::segments::{CommitLog, Position}; use crate::Storage; use std::collections::{HashMap, VecDeque}; use std::io; +use std::time::Instant; + +type PubWithProp = (Publish, Option); + +#[derive(Clone)] +pub struct PublishData { + pub publish: Publish, + pub properties: Option, + pub timestamp: Instant, +} + +impl From for PublishData { + fn from((publish, properties): PubWithProp) -> Self { + PublishData { + publish, + properties, + timestamp: Instant::now(), + } + } +} + +// TODO: remove this from here +impl Storage for PublishData { + // TODO: calculate size of publish properties as well! + fn size(&self) -> usize { + let publish = &self.publish; + 4 + publish.topic.len() + publish.payload.len() + } +} /// Stores 'device' data and 'actions' data in native commitlog /// organized by subscription filter. Device data is replicated @@ -25,10 +55,10 @@ pub struct DataLog { /// Also has waiters used to wake connections/replicator tracker /// which are caught up with all the data on 'Filter' and waiting /// for new data - pub native: Slab>, + pub native: Slab>, /// Map of subscription filter name to filter index filter_indexes: HashMap, - retained_publishes: HashMap, + retained_publishes: HashMap, /// List of filters associated with a topic publish_filters: HashMap>, } @@ -151,7 +181,7 @@ impl DataLog { filter_idx: FilterIdx, offset: Offset, len: u64, - ) -> io::Result<(Position, Vec<(Publish, Offset)>)> { + ) -> io::Result<(Position, Vec<(PubWithProp, Offset)>)> { // unwrap to get index of `self.native` is fine here, because when a new subscribe packet // arrives in `Router::handle_device_payload`, it first calls the function // `next_native_offset` which creates a new commitlog if one doesn't exist. So any new @@ -163,12 +193,45 @@ impl DataLog { // Encoding this information is important so that calling function // has more information on how this method behaves. let next = data.log.readv(offset, len, &mut o)?; + + let now = Instant::now(); + o.retain_mut(|(pubdata, _)| { + // Keep data if no properties exists, which implies no message expiry! + let Some(properties) = pubdata.properties.as_mut() else { + return true + }; + + // Keep data if there is no message_expiry_interval + let Some(message_expiry_interval) = properties.message_expiry_interval.as_mut() else { + return true + }; + + let time_spent = (now - pubdata.timestamp).as_secs() as u32; + + let is_valid = time_spent < *message_expiry_interval; + + // ignore expired messages + if is_valid { + // set message_expiry_interval to (original value - time spent waiting in server) + // ref: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112 + *message_expiry_interval -= time_spent; + } + + is_valid + }); + + // no need to include timestamp when returning + let o = o + .into_iter() + .map(|(pubdata, offset)| ((pubdata.publish, pubdata.properties), offset)) + .collect(); + Ok((next, o)) } - pub fn shadow(&mut self, filter: &str) -> Option { + pub fn shadow(&mut self, filter: &str) -> Option { let data = self.native.get_mut(*self.filter_indexes.get(filter)?)?; - data.log.last() + data.log.last().map(|p| (p.publish, p.properties)) } /// This method is called when the subscriber has caught up with the commit log. In which case, @@ -196,8 +259,14 @@ impl DataLog { inflight } - pub fn insert_to_retained_publishes(&mut self, publish: Publish, topic: Topic) { - self.retained_publishes.insert(topic, publish); + pub fn insert_to_retained_publishes( + &mut self, + publish: Publish, + publish_properties: Option, + topic: Topic, + ) { + let pub_with_props = (publish, publish_properties); + self.retained_publishes.insert(topic, pub_with_props.into()); } pub fn remove_from_retained_publishes(&mut self, topic: Topic) { @@ -285,8 +354,8 @@ impl AckLog { } } - pub fn connack(&mut self, id: ConnectionId, ack: ConnAck) { - let ack = Ack::ConnAck(id, ack); + pub fn connack(&mut self, id: ConnectionId, ack: ConnAck, props: Option) { + let ack = Ack::ConnAck(id, ack, props); self.committed.push_back(ack); } diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 5e90e537..2def53a0 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -8,9 +8,9 @@ use serde::{Deserialize, Serialize}; use crate::{ protocol::{ - ConnAck, Packet, PingResp, PubAck, PubAckProperties, PubComp, PubCompProperties, PubRec, - PubRecProperties, PubRel, PubRelProperties, Publish, PublishProperties, SubAck, - SubAckProperties, UnsubAck, + ConnAck, ConnAckProperties, Disconnect, DisconnectProperties, Packet, PingResp, PubAck, + PubAckProperties, PubComp, PubCompProperties, PubRec, PubRecProperties, PubRel, + PubRelProperties, Publish, PublishProperties, SubAck, SubAckProperties, UnsubAck, }, ConnectionId, Filter, RouterId, Topic, }; @@ -69,8 +69,6 @@ pub enum Event { pub enum Notification { /// Data reply Forward(Forward), - /// Data reply - ForwardWithProperties(Forward, PublishProperties), /// Acks reply for connection data DeviceAck(Ack), /// Data reply @@ -87,6 +85,7 @@ pub enum Notification { /// Shadow Shadow(ShadowReply), Unschedule, + Disconnect(Disconnect, Option), } type MaybePacket = Option; @@ -95,9 +94,10 @@ type MaybePacket = Option; impl From for MaybePacket { fn from(notification: Notification) -> Self { let packet: Packet = match notification { - Notification::Forward(forward) => Packet::Publish(forward.publish, None), + Notification::Forward(forward) => Packet::Publish(forward.publish, forward.properties), Notification::DeviceAck(ack) => ack.into(), Notification::Unschedule => return None, + Notification::Disconnect(disconnect, props) => Packet::Disconnect(disconnect, props), v => { tracing::error!("Unexpected notification here, it cannot be converted into Packet, Notification: {:?}", v); return None; @@ -112,12 +112,16 @@ pub struct Forward { pub cursor: (u64, u64), pub size: usize, pub publish: Publish, + pub properties: Option, } #[derive(Debug, Clone)] #[allow(clippy::enum_variant_names)] pub enum Ack { - ConnAck(ConnectionId, ConnAck), + ConnAck(ConnectionId, ConnAck, Option), + // NOTE: using Option may be a better choice than new variant + // ConnAckWithProperties(ConnectionId, ConnAck, ConnAckProperties), + // TODO: merge the other variants as well using the same pattern PubAck(PubAck), PubAckWithProperties(PubAck, PubAckProperties), SubAck(SubAck), @@ -135,7 +139,7 @@ pub enum Ack { impl From for Packet { fn from(value: Ack) -> Self { match value { - Ack::ConnAck(_id, connack) => Packet::ConnAck(connack, None), + Ack::ConnAck(_id, connack, props) => Packet::ConnAck(connack, props), Ack::PubAck(puback) => Packet::PubAck(puback, None), Ack::PubAckWithProperties(puback, prop) => Packet::PubAck(puback, Some(prop)), Ack::SubAck(suback) => Packet::SubAck(suback, None), diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index e6cd1c22..8397ba47 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -1,6 +1,7 @@ use crate::protocol::{ - ConnAck, ConnectReturnCode, Packet, PingResp, PubAck, PubAckReason, PubComp, PubCompReason, - PubRel, PubRelReason, Publish, QoS, SubAck, SubscribeReasonCode, UnsubAck, + ConnAck, ConnAckProperties, ConnectReturnCode, Disconnect, DisconnectReasonCode, Packet, + PingResp, PubAck, PubAckReason, PubComp, PubCompReason, PubRel, PubRelReason, Publish, + PublishProperties, QoS, SubAck, SubscribeReasonCode, UnsubAck, }; use crate::router::alertlog::alert; use crate::router::graveyard::SavedState; @@ -18,6 +19,7 @@ use thiserror::Error; use tracing::{debug, error, info, trace, warn}; use super::alertlog::{Alert, AlertLog}; +use super::connection::BrokerAliases; use super::graveyard::Graveyard; use super::iobufs::{Incoming, Outgoing}; use super::logs::{AckLog, DataLog}; @@ -48,8 +50,12 @@ pub enum RouterError { InvalidFilterPrefix(Filter), #[error("Invalid client_id {0}")] InvalidClientId(String), + #[error("Disconnection (Reason: {0:?})")] + Disconnect(DisconnectReasonCode), } +const TOPIC_ALIAS_MAX: u16 = 4096; + pub struct Router { id: RouterId, /// Id of this router. Used to index native commitlog to store data from @@ -233,7 +239,9 @@ impl Router { Event::NewMeter(tx) => self.handle_new_meter(tx), Event::NewAlert(tx) => self.handle_new_alert(tx), Event::DeviceData => self.handle_device_payload(id), - Event::Disconnect(disconnect) => self.handle_disconnection(id, disconnect.execute_will), + Event::Disconnect(disconnect) => { + self.handle_disconnection(id, disconnect.execute_will, None) + } Event::Ready => self.scheduler.reschedule(id, ScheduleReason::Ready), Event::Shadow(request) => { retrieve_shadow(&mut self.datalog, &mut self.obufs[id], request) @@ -273,7 +281,7 @@ impl Router { "Duplicate client_id, dropping previous connection with connection_id: {}", connection_id ); - self.handle_disconnection(*connection_id, true); + self.handle_disconnection(*connection_id, true, None); } } @@ -334,8 +342,14 @@ impl Router { code: ConnectReturnCode::Success, }; + let properties = ConnAckProperties { + // TODO: set this to some appropriate value + topic_alias_max: Some(TOPIC_ALIAS_MAX), + ..Default::default() + }; + let ackslog = self.ackslog.get_mut(connection_id).unwrap(); - ackslog.connack(connection_id, ack); + ackslog.connack(connection_id, ack, Some(properties)); self.scheduler .reschedule(connection_id, ScheduleReason::Init); @@ -351,7 +365,12 @@ impl Router { let _alert_id = self.alerts.insert(tx); } - fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { + fn handle_disconnection( + &mut self, + id: ConnectionId, + execute_last_will: bool, + reason: Option, + ) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` let client_id = match &self.obufs.get(id) { @@ -365,12 +384,35 @@ impl Router { let span = tracing::info_span!("incoming_disconnect", client_id); let _guard = span.enter(); + // must handle last will before sending disconnect packet + // as the disconnecting client might have subscribed to will topic. if execute_last_will { self.handle_last_will(id); } info!("Disconnecting connection"); + if let Some(reason_code) = reason { + let outgoing = match self.obufs.get_mut(id) { + Some(v) => v, + None => { + error!("no-connection id {} is already gone", id); + return; + } + }; + + let disconnect = Disconnect { reason_code }; + + let disconnect_notification = Notification::Disconnect(disconnect, None); + + outgoing + .data_buffer + .lock() + .push_back(disconnect_notification); + + outgoing.handle.try_send(()).ok(); + } + // Remove connection from router let mut connection = self.connections.remove(id); let _incoming = self.ibufs.remove(id); @@ -380,7 +422,7 @@ impl Router { self.ackslog.remove(id); // Don't remove connection id from readyqueue with index. This will - // remove wrong connection from readyqueue. Instead just leave diconnected + // remove wrong connection from readyqueue. Instead just leave disconnected // connection in readyqueue and allow 'consume()' method to deal with this // self.readyqueue.remove(id); @@ -451,13 +493,14 @@ impl Router { let mut force_ack = false; let mut new_data = false; let mut disconnect = false; + let mut disconnect_reason: Option = None; let mut execute_will = true; // info!("{:15.15}[I] {:20} count = {}", client_id, "packets", packets.len()); for packet in packets.drain(0..) { match packet { - Packet::Publish(mut publish, _) => { + Packet::Publish(mut publish, properties) => { let span = tracing::error_span!("publish", topic = ?publish.topic, pkid = publish.pkid); let _guard = span.enter(); @@ -519,6 +562,7 @@ impl Router { match append_to_commitlog( id, publish.clone(), + properties, &mut self.datalog, &mut self.notifications, &mut self.connections, @@ -536,6 +580,11 @@ impl Router { ); self.router_meters.failed_publishes += 1; disconnect = true; + + if let RouterError::Disconnect(code) = e { + disconnect_reason = Some(code) + } + break; } }; @@ -617,6 +666,10 @@ impl Router { continue; } + if let Some(broker_aliases) = connection.broker_topic_aliases.as_mut() { + broker_aliases.remove_alias(filter); + } + let unsuback = UnsubAck { pkid, // reasons are used in MQTTv5 @@ -687,6 +740,7 @@ impl Router { match append_to_commitlog( id, publish, + None, &mut self.datalog, &mut self.notifications, &mut self.connections, @@ -751,7 +805,7 @@ impl Router { // on say 5th packet should not block new data notifications for packets // 1 - 4. Hence we use a flag instead of diconnecting immediately if disconnect { - self.handle_disconnection(id, execute_will); + self.handle_disconnection(id, execute_will, disconnect_reason); } } @@ -825,6 +879,9 @@ impl Router { // We always try to ack when ever a connection is scheduled ack_device_data(ackslog, outgoing); + let connection = &mut self.connections[id]; + let broker_topic_aliases = &mut connection.broker_topic_aliases; + // A new connection's tracker is always initialized with acks request. // A subscribe will register data request. // So a new connection is always scheduled with at least one request @@ -841,7 +898,13 @@ impl Router { } }; - match forward_device_data(&mut request, datalog, outgoing, alertlog) { + match forward_device_data( + &mut request, + datalog, + outgoing, + alertlog, + broker_topic_aliases, + ) { ConsumeStatus::BufferFull => { requests.push_back(request); self.scheduler.pause(id, PauseReason::Busy); @@ -888,9 +951,12 @@ impl Router { pkid: 0, payload: will.message, }; + + let properties = None; match append_to_commitlog( id, publish, + properties, &mut self.datalog, &mut self.notifications, &mut self.connections, @@ -951,15 +1017,27 @@ impl Router { fn append_to_commitlog( id: ConnectionId, mut publish: Publish, + mut properties: Option, datalog: &mut DataLog, notifications: &mut VecDeque<(ConnectionId, DataRequest)>, connections: &mut Slab, ) -> Result { + let connection = connections.get_mut(id).unwrap(); + + let topic_alias = properties.as_mut().and_then(|p| { + // clear the received value as it is irrelevant while forwarding publishes + p.topic_alias.take() + }); + + if let Some(alias) = topic_alias { + validate_and_set_topic_alias(&mut publish, connection, alias)?; + }; + let topic = std::str::from_utf8(&publish.topic)?; // Ensure that only clients associated with a tenant can publish to tenant's topic #[cfg(feature = "validate-tenant-prefix")] - if let Some(tenant_prefix) = &connections[id].tenant_prefix { + if let Some(tenant_prefix) = &connection.tenant_prefix { if !topic.starts_with(tenant_prefix) { return Err(RouterError::BadTenant( tenant_prefix.to_owned(), @@ -972,7 +1050,7 @@ fn append_to_commitlog( datalog.remove_from_retained_publishes(topic.to_owned()); } else if publish.retain { error!("Unexpected: retain field was not unset"); - datalog.insert_to_retained_publishes(publish.clone(), topic.to_owned()); + datalog.insert_to_retained_publishes(publish.clone(), properties.clone(), topic.to_owned()); } publish.retain = false; @@ -983,7 +1061,7 @@ fn append_to_commitlog( // Create a dynamic filter if dynamic_filters are enabled for this connection let filter_idxs = match filter_idxs { Some(v) => v, - None if connections[id].dynamic_filters => { + None if connection.dynamic_filters => { let (idx, _cursor) = datalog.next_native_offset(topic); vec![idx] } @@ -993,7 +1071,8 @@ fn append_to_commitlog( let mut o = (0, 0); for filter_idx in filter_idxs { let datalog = datalog.native.get_mut(filter_idx).unwrap(); - let (offset, filter) = datalog.append(publish.clone(), notifications); + let publish_data = (publish.clone(), properties.clone()); + let (offset, filter) = datalog.append(publish_data.into(), notifications); debug!( pkid, "Appended to commitlog: {}[{}, {})", filter, offset.0, offset.1, @@ -1006,6 +1085,39 @@ fn append_to_commitlog( Ok(o) } +fn validate_and_set_topic_alias( + publish: &mut Publish, + connection: &mut Connection, + alias: u16, +) -> Result<(), RouterError> { + if alias == 0 || alias > TOPIC_ALIAS_MAX { + error!("Alias must be greater than 0 and <={TOPIC_ALIAS_MAX}"); + return Err(RouterError::Disconnect( + DisconnectReasonCode::TopicAliasInvalid, + )); + } + + if publish.topic.is_empty() { + // if publish topic is empty, publisher must have set a valid alias + let Some(alias_topic) = connection.topic_aliases.get(&alias) else { + error!("Empty topic name with invalid alias"); + return Err(RouterError::Disconnect( + DisconnectReasonCode::ProtocolError, + )); + }; + // set the publish topic before further processing + publish.topic = alias_topic.to_owned().into(); + } else { + // if publish topic isn't empty, that means + // publisher wants to establish new mapping for topic & alias + let topic = std::str::from_utf8(&publish.topic)?; + connection.topic_aliases.insert(alias, topic.to_owned()); + trace!("set alias {alias} for topic {topic}"); + }; + + Ok(()) +} + /// Sweep ackslog for all the pending acks. /// We write everything to outgoing buf with out worrying about buffer size /// because acks most certainly won't cause memory bloat @@ -1059,6 +1171,7 @@ fn forward_device_data( datalog: &DataLog, outgoing: &mut Outgoing, alertlog: &mut AlertLog, + broker_topic_aliases: &mut Option, ) -> ConsumeStatus { let span = tracing::info_span!("outgoing_publish", client_id = outgoing.client_id); let _guard = span.enter(); @@ -1129,15 +1242,44 @@ fn forward_device_data( return ConsumeStatus::FilterCaughtup; } + let mut topic_alias = broker_topic_aliases + .as_ref() + .and_then(|aliases| aliases.get_alias(&request.filter)); + + let topic_alias_already_exists = topic_alias.is_some(); + + // if topic alias doesn't exists, try creating new one! + if !topic_alias_already_exists { + topic_alias = broker_topic_aliases + .as_mut() + .and_then(|broker_aliases| broker_aliases.set_new_alias(&request.filter)) + } + // Fill and notify device data - let forwards = publishes.into_iter().map(|(mut publish, offset)| { - publish.qos = protocol::qos(qos).unwrap(); - Forward { - cursor: offset, - size: 0, - publish, - } - }); + let forwards = publishes + .into_iter() + .map(|((mut publish, mut properties), offset)| { + publish.qos = protocol::qos(qos).unwrap(); + + // if there is some topic alias to use, set it in publish properties + if topic_alias.is_some() { + let mut props = properties.unwrap_or_default(); + props.topic_alias = topic_alias; + properties = Some(props); + } + + // We want to clear topic if we are using an existing alias + if topic_alias_already_exists { + publish.topic.clear() + } + + Forward { + cursor: offset, + size: 0, + publish, + properties, + } + }); let (len, inflight) = outgoing.push_forwards(forwards, qos, filter_idx); @@ -1168,7 +1310,7 @@ fn forward_device_data( fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: ShadowRequest) { if let Some(reply) = datalog.shadow(&shadow.filter) { - let publish = reply; + let publish = reply.0; let shadow_reply = router::ShadowReply { topic: publish.topic, payload: publish.payload, diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 7ba756d4..cd818374 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -139,8 +139,15 @@ impl Broker { pub fn link(&self, client_id: &str) -> Result<(LinkTx, LinkRx), local::LinkError> { // Register this connection with the router. Router replies with ack which if ok will // start the link. Router can sometimes reject the connection (ex max connection limit) - let (link_tx, link_rx, _ack) = - Link::new(None, client_id, self.router_tx.clone(), true, None, false)?; + let (link_tx, link_rx, _ack) = Link::new( + None, + client_id, + self.router_tx.clone(), + true, + None, + false, + None, + )?; Ok((link_tx, link_rx)) }