Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic alias and message expiry support in broker #616

Merged
merged 22 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7553031
feat: store properties and expiry in logs
swanandx May 8, 2023
6baf993
feat: ignore expired messages
swanandx May 8, 2023
e160b74
fix: checking expired messages
swanandx May 8, 2023
686b17b
feat: broker handling topic alias
swanandx May 9, 2023
e47d07e
feat: set message expiry interval to received value minus time spent …
swanandx May 9, 2023
9675fe2
fix: case when time spent is equal to message expiry interval
swanandx May 10, 2023
d95f0d1
feat: broker can set topic alias for clients to use
swanandx May 10, 2023
a9c269a
feat: send disconnet packets to client
swanandx May 10, 2023
0587033
feat: using let-else instead of if-let
swanandx May 11, 2023
a6a5008
feat: entry in CHANGELOG
swanandx May 13, 2023
65ef8b7
fix: declare topic in outer scope
swanandx May 13, 2023
37e9aa0
fix: missing argument for Link
swanandx May 13, 2023
659c394
feat: include Reason in disconnection message
swanandx May 19, 2023
0bd88fc
feat: handle tenant prefix properly while forwarding, check max_alias…
swanandx May 19, 2023
a026f1f
feat: make broker_topic_alias optional
swanandx May 20, 2023
8b9b380
feat: validate and set topic alias in separate fn
swanandx May 22, 2023
1bee7c0
feat: comments regarding last will and storage
swanandx May 22, 2023
6f9800a
feat: extract all broker topic alias related things to separate struct
swanandx May 22, 2023
a49f1b2
feat: improve readability for message expiry
swanandx May 22, 2023
9216729
feat: typos and todo
swanandx May 23, 2023
4a09d96
feat: ignore connack properties in v4
swanandx May 23, 2023
0539166
feat: rename topic_alias_exists to topic_alias_already_exists
swanandx May 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Support for topic alias and message expiry in v5

### Changed
- Certificate paths configured in config file are checked during startup and throws a panic if it is not valid.
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/link/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
"Starting bridge with subscription on filter \"{}\"",
&config.sub_path,
);
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true)?;
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true, None)?;

'outer: loop {
let mut network = match network_connect(&config, &config.addr, protocol.clone()).await {
Expand Down
3 changes: 2 additions & 1 deletion rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl ConsoleLink {
/// Requires the corresponding Router to be running to complete
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
let tx = router_tx.clone();
let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap();
let (link_tx, link_rx, _ack) =
Link::new(None, "console", tx, true, None, true, None).unwrap();
let connection_id = link_tx.connection_id;
ConsoleLink {
config,
Expand Down
26 changes: 21 additions & 5 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> (
Event,
Arc<Mutex<VecDeque<Packet>>>,
Expand All @@ -57,6 +58,7 @@ impl Link {
clean,
last_will,
dynamic_filters,
topic_alias_max,
);
let incoming = Incoming::new(connection.client_id.to_owned());
let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned());
Expand All @@ -80,12 +82,19 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, Notification), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send((0, message))?;

link_rx.recv()?;
Expand All @@ -110,20 +119,27 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, ConnAck), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send_async((0, message)).await?;

link_rx.recv_async().await?;
let notification = o.lock().pop_front().unwrap();
// Right now link identifies failure with dropped rx in router,
// which is probably ok. We need this here to get id assigned by router
let (id, ack) = match notification {
Notification::DeviceAck(Ack::ConnAck(id, ack)) => (id, ack),
Notification::DeviceAck(Ack::ConnAck(id, ack, _)) => (id, ack),
_message => return Err(LinkError::NotConnectionAck),
};

Expand Down
10 changes: 7 additions & 3 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ impl<P: Protocol> RemoteLink<P> {
})
.await??;

let (connect, lastwill, login) = match packet {
Packet::Connect(connect, _, _lastwill, _, login) => {
let (connect, props, lastwill, login) = match packet {
Packet::Connect(connect, props, _lastwill, _, login) => {
Span::current().record("client_id", &connect.client_id);

// Ignore last will
(connect, None, login)
(connect, props, None, login)
}
packet => return Err(Error::NotConnectPacket(packet)),
};
Expand Down Expand Up @@ -119,14 +119,18 @@ impl<P: Protocol> RemoteLink<P> {
return Err(Error::InvalidClientId);
}

let topic_alias_max = props.and_then(|p| p.topic_alias_max);

let (link_tx, link_rx, notification) = Link::new(
tenant_id,
&client_id,
router_tx,
clean_session,
lastwill,
dynamic_filters,
topic_alias_max,
)?;
h3nill marked this conversation as resolved.
Show resolved Hide resolved

let id = link_rx.id();
Span::current().record("connection_id", id);

Expand Down
1 change: 1 addition & 0 deletions rumqttd/src/link/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ShadowLink {
true,
None,
config.dynamic_filters,
None,
)?;
let connection_id = link_rx.id();

Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub struct ConnAck {
pub code: ConnectReturnCode,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ConnAckProperties {
pub session_expiry_interval: Option<u32>,
pub receive_max: Option<u16>,
Expand Down Expand Up @@ -246,7 +246,7 @@ impl Publish {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PublishProperties {
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
Expand Down
5 changes: 4 additions & 1 deletion rumqttd/src/protocol/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ impl Protocol for V4 {
Packet::Connect(connect, None, last_will, None, login) => {
connect::write(&connect, &login, &last_will, buffer)?
}
Packet::ConnAck(connack, None) => connack::write(&connack, buffer)?,
// TODO: set ConnAckProperties conditionally based on version
// currently we can't conditionally set them based on v5 or v4,
// so we ignore them, as properties can't be there in v4.
Packet::ConnAck(connack, _) => connack::write(&connack, buffer)?,
Packet::Publish(publish, None) => publish::write(&publish, buffer)?,
Packet::PubAck(puback, None) => puback::write(&puback, buffer)?,
Packet::Subscribe(subscribe, None) => subscribe::write(&subscribe, buffer)?,
Expand Down
73 changes: 71 additions & 2 deletions rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::protocol::LastWill;
use slab::Slab;

use crate::Filter;
use std::collections::HashSet;
use crate::{protocol::LastWill, Topic};
use std::collections::{HashMap, HashSet};

use super::ConnectionEvents;

Expand All @@ -22,6 +24,10 @@ pub struct Connection {
pub last_will: Option<LastWill>,
/// Connection events
pub events: ConnectionEvents,
/// Topic aliases set by clients
pub(crate) topic_aliases: HashMap<u16, Topic>,
/// Topic aliases used by broker
pub(crate) broker_topic_aliases: Option<BrokerAliases>,
}

impl Connection {
Expand All @@ -32,6 +38,7 @@ impl Connection {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> Connection {
// Change client id to -> tenant_id.client_id and derive topic path prefix
// to validate topics
Expand All @@ -44,6 +51,13 @@ impl Connection {
None => (client_id, None),
};

// if topic_alias_max is 0, that means client doesn't want to use / support topic alias
let broker_topic_aliases = if topic_alias_max == 0 {
None
} else {
Some(BrokerAliases::new(topic_alias_max))
};

Connection {
client_id,
tenant_prefix,
Expand All @@ -52,6 +66,61 @@ impl Connection {
subscriptions: HashSet::default(),
last_will,
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
}
}
}

#[derive(Debug)]
pub(crate) struct BrokerAliases {
pub(crate) broker_topic_aliases: HashMap<Filter, u16>,
pub(crate) used_aliases: Slab<()>,
pub(crate) topic_alias_max: u16,
}

impl BrokerAliases {
fn new(topic_alias_max: u16) -> BrokerAliases {
let mut used_aliases = Slab::new();
// occupy 0th index as 0 is invalid topic alias
assert_eq!(0, used_aliases.insert(()));

let broker_topic_aliases = HashMap::new();

BrokerAliases {
broker_topic_aliases,
used_aliases,
topic_alias_max,
}
}

// unset / remove the alias for topic
pub fn remove_alias(&mut self, topic: &str) {
if let Some(alias) = self.broker_topic_aliases.remove(topic) {
self.used_aliases.remove(alias as usize);
}
}

// Get alias used for the topic, if it exists
pub fn get_alias(&self, topic: &str) -> Option<u16> {
self.broker_topic_aliases.get(topic).copied()
}

// Set new alias for a topic and return the alias
// returns None if can't set new alias
pub fn set_new_alias(&mut self, topic: &str) -> Option<u16> {
let alias_to_use = self.used_aliases.insert(());

// NOTE: maybe we can use self.used_aliases.len()
// to check for availability of alias
if alias_to_use > self.topic_alias_max as usize {
self.used_aliases.remove(alias_to_use);
return None;
}

let alias_to_use = alias_to_use as u16;
self.broker_topic_aliases
.insert(topic.to_owned(), alias_to_use);
Some(alias_to_use)
}
}
Loading