Skip to content

Commit

Permalink
Topic alias and message expiry support in broker (bytebeamio#616)
Browse files Browse the repository at this point in the history
* feat: store properties and expiry in logs

* feat: ignore expired messages

* fix: checking expired messages

* feat: broker handling topic alias

* feat: set message expiry interval to received value minus time spent waiting

* fix: case when time spent is equal to message expiry interval

* feat: broker can set topic alias for clients to use

* feat: send disconnet packets to client

* feat: using let-else instead of if-let

* feat: entry in CHANGELOG

* fix: declare topic in outer scope

* fix: missing argument for Link

* feat: include Reason in disconnection message

* feat: handle tenant prefix properly while forwarding, check max_alias explicitly

* feat: make broker_topic_alias optional

* feat: validate and set topic alias in separate fn

* feat: comments regarding last will and storage

* feat: extract all broker topic alias related things to separate struct

* feat: improve readability for message expiry

* feat: typos and todo

* feat: ignore connack properties in v4

* feat: rename topic_alias_exists to topic_alias_already_exists
  • Loading branch information
swanandx authored May 29, 2023
1 parent ffefef9 commit 2da0608
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 58 deletions.
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Support for topic alias and message expiry in v5

### Changed
- Certificate paths configured in config file are checked during startup and throws a panic if it is not valid.
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/link/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
"Starting bridge with subscription on filter \"{}\"",
&config.sub_path,
);
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true)?;
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true, None)?;

'outer: loop {
let mut network = match network_connect(&config, &config.addr, protocol.clone()).await {
Expand Down
3 changes: 2 additions & 1 deletion rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl ConsoleLink {
/// Requires the corresponding Router to be running to complete
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
let tx = router_tx.clone();
let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap();
let (link_tx, link_rx, _ack) =
Link::new(None, "console", tx, true, None, true, None).unwrap();
let connection_id = link_tx.connection_id;
ConsoleLink {
config,
Expand Down
26 changes: 21 additions & 5 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> (
Event,
Arc<Mutex<VecDeque<Packet>>>,
Expand All @@ -57,6 +58,7 @@ impl Link {
clean,
last_will,
dynamic_filters,
topic_alias_max,
);
let incoming = Incoming::new(connection.client_id.to_owned());
let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned());
Expand All @@ -80,12 +82,19 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, Notification), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send((0, message))?;

link_rx.recv()?;
Expand All @@ -110,20 +119,27 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, ConnAck), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send_async((0, message)).await?;

link_rx.recv_async().await?;
let notification = o.lock().pop_front().unwrap();
// Right now link identifies failure with dropped rx in router,
// which is probably ok. We need this here to get id assigned by router
let (id, ack) = match notification {
Notification::DeviceAck(Ack::ConnAck(id, ack)) => (id, ack),
Notification::DeviceAck(Ack::ConnAck(id, ack, _)) => (id, ack),
_message => return Err(LinkError::NotConnectionAck),
};

Expand Down
10 changes: 7 additions & 3 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ impl<P: Protocol> RemoteLink<P> {
})
.await??;

let (connect, lastwill, login) = match packet {
Packet::Connect(connect, _, _lastwill, _, login) => {
let (connect, props, lastwill, login) = match packet {
Packet::Connect(connect, props, _lastwill, _, login) => {
Span::current().record("client_id", &connect.client_id);

// Ignore last will
(connect, None, login)
(connect, props, None, login)
}
packet => return Err(Error::NotConnectPacket(packet)),
};
Expand Down Expand Up @@ -119,14 +119,18 @@ impl<P: Protocol> RemoteLink<P> {
return Err(Error::InvalidClientId);
}

let topic_alias_max = props.and_then(|p| p.topic_alias_max);

let (link_tx, link_rx, notification) = Link::new(
tenant_id,
&client_id,
router_tx,
clean_session,
lastwill,
dynamic_filters,
topic_alias_max,
)?;

let id = link_rx.id();
Span::current().record("connection_id", id);

Expand Down
1 change: 1 addition & 0 deletions rumqttd/src/link/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ShadowLink {
true,
None,
config.dynamic_filters,
None,
)?;
let connection_id = link_rx.id();

Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub struct ConnAck {
pub code: ConnectReturnCode,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ConnAckProperties {
pub session_expiry_interval: Option<u32>,
pub receive_max: Option<u16>,
Expand Down Expand Up @@ -246,7 +246,7 @@ impl Publish {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PublishProperties {
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
Expand Down
5 changes: 4 additions & 1 deletion rumqttd/src/protocol/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ impl Protocol for V4 {
Packet::Connect(connect, None, last_will, None, login) => {
connect::write(&connect, &login, &last_will, buffer)?
}
Packet::ConnAck(connack, None) => connack::write(&connack, buffer)?,
// TODO: set ConnAckProperties conditionally based on version
// currently we can't conditionally set them based on v5 or v4,
// so we ignore them, as properties can't be there in v4.
Packet::ConnAck(connack, _) => connack::write(&connack, buffer)?,
Packet::Publish(publish, None) => publish::write(&publish, buffer)?,
Packet::PubAck(puback, None) => puback::write(&puback, buffer)?,
Packet::Subscribe(subscribe, None) => subscribe::write(&subscribe, buffer)?,
Expand Down
73 changes: 71 additions & 2 deletions rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::protocol::LastWill;
use slab::Slab;

use crate::Filter;
use std::collections::HashSet;
use crate::{protocol::LastWill, Topic};
use std::collections::{HashMap, HashSet};

use super::ConnectionEvents;

Expand All @@ -22,6 +24,10 @@ pub struct Connection {
pub last_will: Option<LastWill>,
/// Connection events
pub events: ConnectionEvents,
/// Topic aliases set by clients
pub(crate) topic_aliases: HashMap<u16, Topic>,
/// Topic aliases used by broker
pub(crate) broker_topic_aliases: Option<BrokerAliases>,
}

impl Connection {
Expand All @@ -32,6 +38,7 @@ impl Connection {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> Connection {
// Change client id to -> tenant_id.client_id and derive topic path prefix
// to validate topics
Expand All @@ -44,6 +51,13 @@ impl Connection {
None => (client_id, None),
};

// if topic_alias_max is 0, that means client doesn't want to use / support topic alias
let broker_topic_aliases = if topic_alias_max == 0 {
None
} else {
Some(BrokerAliases::new(topic_alias_max))
};

Connection {
client_id,
tenant_prefix,
Expand All @@ -52,6 +66,61 @@ impl Connection {
subscriptions: HashSet::default(),
last_will,
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
}
}
}

#[derive(Debug)]
pub(crate) struct BrokerAliases {
pub(crate) broker_topic_aliases: HashMap<Filter, u16>,
pub(crate) used_aliases: Slab<()>,
pub(crate) topic_alias_max: u16,
}

impl BrokerAliases {
fn new(topic_alias_max: u16) -> BrokerAliases {
let mut used_aliases = Slab::new();
// occupy 0th index as 0 is invalid topic alias
assert_eq!(0, used_aliases.insert(()));

let broker_topic_aliases = HashMap::new();

BrokerAliases {
broker_topic_aliases,
used_aliases,
topic_alias_max,
}
}

// unset / remove the alias for topic
pub fn remove_alias(&mut self, topic: &str) {
if let Some(alias) = self.broker_topic_aliases.remove(topic) {
self.used_aliases.remove(alias as usize);
}
}

// Get alias used for the topic, if it exists
pub fn get_alias(&self, topic: &str) -> Option<u16> {
self.broker_topic_aliases.get(topic).copied()
}

// Set new alias for a topic and return the alias
// returns None if can't set new alias
pub fn set_new_alias(&mut self, topic: &str) -> Option<u16> {
let alias_to_use = self.used_aliases.insert(());

// NOTE: maybe we can use self.used_aliases.len()
// to check for availability of alias
if alias_to_use > self.topic_alias_max as usize {
self.used_aliases.remove(alias_to_use);
return None;
}

let alias_to_use = alias_to_use as u16;
self.broker_topic_aliases
.insert(topic.to_owned(), alias_to_use);
Some(alias_to_use)
}
}
Loading

0 comments on commit 2da0608

Please sign in to comment.