Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic alias and message expiry support in broker #616

Merged
merged 22 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7553031
feat: store properties and expiry in logs
swanandx May 8, 2023
6baf993
feat: ignore expired messages
swanandx May 8, 2023
e160b74
fix: checking expired messages
swanandx May 8, 2023
686b17b
feat: broker handling topic alias
swanandx May 9, 2023
e47d07e
feat: set message expiry interval to received value minus time spent …
swanandx May 9, 2023
9675fe2
fix: case when time spent is equal to message expiry interval
swanandx May 10, 2023
d95f0d1
feat: broker can set topic alias for clients to use
swanandx May 10, 2023
a9c269a
feat: send disconnet packets to client
swanandx May 10, 2023
0587033
feat: using let-else instead of if-let
swanandx May 11, 2023
a6a5008
feat: entry in CHANGELOG
swanandx May 13, 2023
65ef8b7
fix: declare topic in outer scope
swanandx May 13, 2023
37e9aa0
fix: missing argument for Link
swanandx May 13, 2023
659c394
feat: include Reason in disconnection message
swanandx May 19, 2023
0bd88fc
feat: handle tenant prefix properly while forwarding, check max_alias…
swanandx May 19, 2023
a026f1f
feat: make broker_topic_alias optional
swanandx May 20, 2023
8b9b380
feat: validate and set topic alias in separate fn
swanandx May 22, 2023
1bee7c0
feat: comments regarding last will and storage
swanandx May 22, 2023
6f9800a
feat: extract all broker topic alias related things to separate struct
swanandx May 22, 2023
a49f1b2
feat: improve readability for message expiry
swanandx May 22, 2023
9216729
feat: typos and todo
swanandx May 23, 2023
4a09d96
feat: ignore connack properties in v4
swanandx May 23, 2023
0539166
feat: rename topic_alias_exists to topic_alias_already_exists
swanandx May 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Support for topic alias and message expiry in v5

### Changed
- Certificate paths configured in config file are checked during startup and throws a panic if it is not valid.
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/link/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
"Starting bridge with subscription on filter \"{}\"",
&config.sub_path,
);
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true)?;
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true, None)?;

'outer: loop {
let mut network = match network_connect(&config, &config.addr, protocol.clone()).await {
Expand Down
3 changes: 2 additions & 1 deletion rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl ConsoleLink {
/// Requires the corresponding Router to be running to complete
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
let tx = router_tx.clone();
let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap();
let (link_tx, link_rx, _ack) =
Link::new(None, "console", tx, true, None, true, None).unwrap();
let connection_id = link_tx.connection_id;
ConsoleLink {
config,
Expand Down
26 changes: 21 additions & 5 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> (
Event,
Arc<Mutex<VecDeque<Packet>>>,
Expand All @@ -57,6 +58,7 @@ impl Link {
clean,
last_will,
dynamic_filters,
topic_alias_max,
);
let incoming = Incoming::new(connection.client_id.to_owned());
let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned());
Expand All @@ -80,12 +82,19 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, Notification), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send((0, message))?;

link_rx.recv()?;
Expand All @@ -110,20 +119,27 @@ impl Link {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: Option<u16>,
) -> Result<(LinkTx, LinkRx, ConnAck), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
let (message, i, o, link_rx) = Link::prepare(
tenant_id,
client_id,
clean,
last_will,
dynamic_filters,
topic_alias_max.unwrap_or(0),
);
router_tx.send_async((0, message)).await?;

link_rx.recv_async().await?;
let notification = o.lock().pop_front().unwrap();
// Right now link identifies failure with dropped rx in router,
// which is probably ok. We need this here to get id assigned by router
let (id, ack) = match notification {
Notification::DeviceAck(Ack::ConnAck(id, ack)) => (id, ack),
Notification::DeviceAck(Ack::ConnAck(id, ack, _)) => (id, ack),
_message => return Err(LinkError::NotConnectionAck),
};

Expand Down
10 changes: 7 additions & 3 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ impl<P: Protocol> RemoteLink<P> {
})
.await??;

let (connect, lastwill, login) = match packet {
Packet::Connect(connect, _, _lastwill, _, login) => {
let (connect, props, lastwill, login) = match packet {
Packet::Connect(connect, props, _lastwill, _, login) => {
Span::current().record("client_id", &connect.client_id);

// Ignore last will
(connect, None, login)
(connect, props, None, login)
}
packet => return Err(Error::NotConnectPacket(packet)),
};
Expand Down Expand Up @@ -119,14 +119,18 @@ impl<P: Protocol> RemoteLink<P> {
return Err(Error::InvalidClientId);
}

let topic_alias_max = props.and_then(|p| p.topic_alias_max);

let (link_tx, link_rx, notification) = Link::new(
tenant_id,
&client_id,
router_tx,
clean_session,
lastwill,
dynamic_filters,
topic_alias_max,
)?;
h3nill marked this conversation as resolved.
Show resolved Hide resolved

let id = link_rx.id();
Span::current().record("connection_id", id);

Expand Down
1 change: 1 addition & 0 deletions rumqttd/src/link/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ShadowLink {
true,
None,
config.dynamic_filters,
None,
)?;
let connection_id = link_rx.id();

Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub struct ConnAck {
pub code: ConnectReturnCode,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ConnAckProperties {
pub session_expiry_interval: Option<u32>,
pub receive_max: Option<u16>,
Expand Down Expand Up @@ -246,7 +246,7 @@ impl Publish {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PublishProperties {
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
Expand Down
26 changes: 25 additions & 1 deletion rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use slab::Slab;

use crate::protocol::LastWill;
use crate::Filter;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use super::ConnectionEvents;

Expand All @@ -22,6 +24,12 @@ pub struct Connection {
pub last_will: Option<LastWill>,
/// Connection events
pub events: ConnectionEvents,
/// Topic aliases set by clients
pub(crate) topic_aliases: HashMap<u16, Filter>,
swanandx marked this conversation as resolved.
Show resolved Hide resolved
/// Topic aliases used by broker
pub(crate) broker_topic_aliases: Option<HashMap<Filter, u16>>,
pub topic_alias_max: u16,
pub(crate) used_aliases: Slab<()>,
}

impl Connection {
Expand All @@ -32,6 +40,7 @@ impl Connection {
clean: bool,
last_will: Option<LastWill>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> Connection {
// Change client id to -> tenant_id.client_id and derive topic path prefix
// to validate topics
Expand All @@ -44,6 +53,17 @@ impl Connection {
None => (client_id, None),
};

let mut used_aliases = Slab::new();
// occupy 0th index as 0 is invalid topic alias
assert_eq!(0, used_aliases.insert(()));

// topic_alias_max is 0, that means client doesn't want to use / support topic alias
swanandx marked this conversation as resolved.
Show resolved Hide resolved
let broker_topic_aliases = if topic_alias_max == 0 {
None
} else {
Some(HashMap::new())
};

Connection {
client_id,
tenant_prefix,
Expand All @@ -52,6 +72,10 @@ impl Connection {
subscriptions: HashSet::default(),
last_will,
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
topic_alias_max,
used_aliases,
}
}
}
84 changes: 74 additions & 10 deletions rumqttd/src/router/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use slab::Slab;
use tracing::trace;

use crate::protocol::{
matches, ConnAck, PingResp, PubAck, PubComp, PubRec, PubRel, Publish, SubAck, UnsubAck,
matches, ConnAck, ConnAckProperties, PingResp, PubAck, PubComp, PubRec, PubRel, Publish,
PublishProperties, SubAck, UnsubAck,
};
use crate::router::{DataRequest, FilterIdx, SubscriptionMeter, Waiters};
use crate::{ConnectionId, Filter, Offset, RouterConfig, Topic};
Expand All @@ -12,6 +13,34 @@ use crate::segments::{CommitLog, Position};
use crate::Storage;
use std::collections::{HashMap, VecDeque};
use std::io;
use std::time::Instant;

type PubWithProp = (Publish, Option<PublishProperties>);

#[derive(Clone)]
pub struct PublishData {
pub publish: Publish,
pub properties: Option<PublishProperties>,
pub timestamp: Instant,
}

impl From<PubWithProp> for PublishData {
fn from((publish, properties): PubWithProp) -> Self {
PublishData {
publish,
properties,
timestamp: Instant::now(),
swanandx marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// TODO: remove this from here
impl Storage for PublishData {
swanandx marked this conversation as resolved.
Show resolved Hide resolved
fn size(&self) -> usize {
let publish = &self.publish;
4 + publish.topic.len() + publish.payload.len()
}
}

/// Stores 'device' data and 'actions' data in native commitlog
/// organized by subscription filter. Device data is replicated
Expand All @@ -25,10 +54,10 @@ pub struct DataLog {
/// Also has waiters used to wake connections/replicator tracker
/// which are caught up with all the data on 'Filter' and waiting
/// for new data
pub native: Slab<Data<Publish>>,
pub native: Slab<Data<PublishData>>,
/// Map of subscription filter name to filter index
filter_indexes: HashMap<Filter, FilterIdx>,
retained_publishes: HashMap<Topic, Publish>,
retained_publishes: HashMap<Topic, PublishData>,
/// List of filters associated with a topic
publish_filters: HashMap<Topic, Vec<FilterIdx>>,
}
Expand Down Expand Up @@ -151,7 +180,7 @@ impl DataLog {
filter_idx: FilterIdx,
offset: Offset,
len: u64,
) -> io::Result<(Position, Vec<(Publish, Offset)>)> {
) -> io::Result<(Position, Vec<(PubWithProp, Offset)>)> {
// unwrap to get index of `self.native` is fine here, because when a new subscribe packet
// arrives in `Router::handle_device_payload`, it first calls the function
// `next_native_offset` which creates a new commitlog if one doesn't exist. So any new
Expand All @@ -163,12 +192,41 @@ impl DataLog {
// Encoding this information is important so that calling function
// has more information on how this method behaves.
let next = data.log.readv(offset, len, &mut o)?;

let now = Instant::now();
o.retain_mut(|(pubdata, _)| {
let Some(PublishProperties {
swanandx marked this conversation as resolved.
Show resolved Hide resolved
message_expiry_interval: Some(t),
..
}) = pubdata.properties.as_mut() else {
return true;
};

let time_spent = (now - pubdata.timestamp).as_secs() as u32;

let is_valid = time_spent < *t;

// ignore expired messages
if is_valid {
swanandx marked this conversation as resolved.
Show resolved Hide resolved
// set message_expiry_interval to (original value - time spent waiting in server)
*t -= time_spent;
}

is_valid
});

// no need to include timestamp when returning
let o = o
.into_iter()
.map(|(pubdata, offset)| ((pubdata.publish, pubdata.properties), offset))
.collect();

Ok((next, o))
}

pub fn shadow(&mut self, filter: &str) -> Option<Publish> {
pub fn shadow(&mut self, filter: &str) -> Option<PubWithProp> {
let data = self.native.get_mut(*self.filter_indexes.get(filter)?)?;
data.log.last()
data.log.last().map(|p| (p.publish, p.properties))
}

/// This method is called when the subscriber has caught up with the commit log. In which case,
Expand Down Expand Up @@ -196,8 +254,14 @@ impl DataLog {
inflight
}

pub fn insert_to_retained_publishes(&mut self, publish: Publish, topic: Topic) {
self.retained_publishes.insert(topic, publish);
pub fn insert_to_retained_publishes(
&mut self,
publish: Publish,
publish_properties: Option<PublishProperties>,
topic: Topic,
) {
let pub_with_props = (publish, publish_properties);
self.retained_publishes.insert(topic, pub_with_props.into());
}

pub fn remove_from_retained_publishes(&mut self, topic: Topic) {
Expand Down Expand Up @@ -285,8 +349,8 @@ impl AckLog {
}
}

pub fn connack(&mut self, id: ConnectionId, ack: ConnAck) {
let ack = Ack::ConnAck(id, ack);
pub fn connack(&mut self, id: ConnectionId, ack: ConnAck, props: Option<ConnAckProperties>) {
let ack = Ack::ConnAck(id, ack, props);
self.committed.push_back(ack);
}

Expand Down
Loading