Skip to content

Commit

Permalink
feat(rumqttd): non-consuming builder for Connection (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
swanandx authored Sep 9, 2023
1 parent 0041bf5 commit 27282a8
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 18 deletions.
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Will delay interval for MQTTv5 (#686)

### Changed
- Non-consuming builder pattern for constructing Connection

### Deprecated

Expand Down
9 changes: 5 additions & 4 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,16 @@ impl<'a> LinkBuilder<'a> {
pub fn build(self) -> Result<(LinkTx, LinkRx, Notification), LinkError> {
// Connect to router
// Local connections to the router shall have access to all subscriptions
let connection = Connection::new(
let mut connection = Connection::new(
self.tenant_id,
self.client_id.to_owned(),
self.clean_session,
self.last_will,
self.last_will_properties,
self.dynamic_filters,
self.topic_alias_max,
);

connection
.last_will(self.last_will, self.last_will_properties)
.topic_alias_max(self.topic_alias_max);
let incoming = Incoming::new(connection.client_id.to_owned());
let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned());
let outgoing_data_buffer = outgoing.buffer();
Expand Down
34 changes: 21 additions & 13 deletions rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ impl Connection {
tenant_id: Option<String>,
client_id: String,
clean: bool,
last_will: Option<LastWill>,
last_will_properties: Option<LastWillProperties>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> Connection {
// Change client id to -> tenant_id.client_id and derive topic path prefix
// to validate topics
Expand All @@ -57,27 +54,38 @@ impl Connection {
None => (client_id, None),
};

// if topic_alias_max is 0, that means client doesn't want to use / support topic alias
let broker_topic_aliases = if topic_alias_max == 0 {
None
} else {
Some(BrokerAliases::new(topic_alias_max))
};

Connection {
client_id,
tenant_prefix,
dynamic_filters,
clean,
subscriptions: HashSet::default(),
last_will,
last_will_properties,
last_will: None,
last_will_properties: None,
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
broker_topic_aliases: None,
subscription_ids: HashMap::new(),
}
}

pub fn topic_alias_max(&mut self, max: u16) -> &mut Connection {
// if topic_alias_max is 0, that means client doesn't want to use / support topic alias
if max > 0 {
self.broker_topic_aliases = Some(BrokerAliases::new(max));
}
self
}

pub fn last_will(
&mut self,
will: Option<LastWill>,
props: Option<LastWillProperties>,
) -> &mut Connection {
self.last_will = will;
self.last_will_properties = props;
self
}
}

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ impl Router {
let tenant_prefix = tenant_id.map(|id| format!("/tenants/{id}/"));

let Some((will, will_props)) = self.last_wills.remove(&client_id) else {
return
return;
};

let publish = Publish {
Expand Down

0 comments on commit 27282a8

Please sign in to comment.