From 75b02d1153aa30a433e5afe27d0fbbbd9ac25b3e Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:59:29 +0530 Subject: [PATCH 1/2] feat: non-consuming builder for Connection --- rumqttd/src/link/local.rs | 9 +++++---- rumqttd/src/router/connection.rs | 34 ++++++++++++++++++++------------ rumqttd/src/router/routing.rs | 2 +- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 75b4883f..44b08b5e 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -100,15 +100,16 @@ impl<'a> LinkBuilder<'a> { pub fn build(self) -> Result<(LinkTx, LinkRx, Notification), LinkError> { // Connect to router // Local connections to the router shall have access to all subscriptions - let connection = Connection::new( + let mut connection = Connection::new( self.tenant_id, self.client_id.to_owned(), self.clean_session, - self.last_will, - self.last_will_properties, self.dynamic_filters, - self.topic_alias_max, ); + + connection + .last_will(self.last_will, self.last_will_properties) + .topic_alias_max(self.topic_alias_max); let incoming = Incoming::new(connection.client_id.to_owned()); let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned()); let outgoing_data_buffer = outgoing.buffer(); diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index ff5f485d..992376cb 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -41,10 +41,7 @@ impl Connection { tenant_id: Option, client_id: String, clean: bool, - last_will: Option, - last_will_properties: Option, dynamic_filters: bool, - topic_alias_max: u16, ) -> Connection { // Change client id to -> tenant_id.client_id and derive topic path prefix // to validate topics @@ -57,27 +54,38 @@ impl Connection { None => (client_id, None), }; - // if topic_alias_max is 0, that means client doesn't want to use / support topic alias - let broker_topic_aliases = if topic_alias_max == 0 { - None - } else { - Some(BrokerAliases::new(topic_alias_max)) - }; - Connection { client_id, tenant_prefix, dynamic_filters, clean, subscriptions: HashSet::default(), - last_will, - last_will_properties, + last_will: None, + last_will_properties: None, events: ConnectionEvents::default(), topic_aliases: HashMap::new(), - broker_topic_aliases, + broker_topic_aliases: None, subscription_ids: HashMap::new(), } } + + pub fn topic_alias_max(&mut self, max: u16) -> &mut Connection { + // if topic_alias_max is 0, that means client doesn't want to use / support topic alias + if max > 0 { + self.broker_topic_aliases = Some(BrokerAliases::new(max)); + } + self + } + + pub fn last_will( + &mut self, + will: Option, + props: Option, + ) -> &mut Connection { + self.last_will = will; + self.last_will_properties = props; + self + } } #[derive(Debug)] diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 56b995b6..3f1e960e 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -1092,7 +1092,7 @@ impl Router { let tenant_prefix = tenant_id.map(|id| format!("/tenants/{id}/")); let Some((will, will_props)) = self.last_wills.remove(&client_id) else { - return + return; }; let publish = Publish { From b89101b5ecb3ffaf5aa27bb7c19ac008d122d941 Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 5 Sep 2023 21:01:40 +0530 Subject: [PATCH 2/2] chore: CHANGELOG entry --- rumqttd/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/rumqttd/CHANGELOG.md b/rumqttd/CHANGELOG.md index 1d1e4ee6..d6bee3c0 100644 --- a/rumqttd/CHANGELOG.md +++ b/rumqttd/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Will delay interval for MQTTv5 (#686) ### Changed +- Non-consuming builder pattern for constructing Connection ### Deprecated