Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription IDs support in broker #632

Merged
merged 6 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions rumqttc/examples/subscription_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use rumqttc::v5::mqttbytes::v5::SubscribeProperties;
use rumqttc::v5::mqttbytes::QoS;
use tokio::{task, time};

use rumqttc::v5::{AsyncClient, MqttOptions};
use std::error::Error;
use std::time::Duration;

#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
requests(client).await;
time::sleep(Duration::from_secs(3)).await;
});

while let Ok(event) = eventloop.poll().await {
println!("{:?}", event);
}

Ok(())
}

async fn requests(client: AsyncClient) {
let props = SubscribeProperties {
id: Some(1),
user_properties: vec![],
};

client
.subscribe_with_properties("hello/world", QoS::AtMostOnce, props)
.await
.unwrap();

let props = SubscribeProperties {
id: Some(2),
user_properties: vec![],
};

client
.subscribe_with_properties("hello/#", QoS::AtMostOnce, props)
.await
.unwrap();

time::sleep(Duration::from_millis(500)).await;
// we will receive two publishes
// one due to hello/world and other due to hello/#
// both will have respective subscription ids
client
.publish(
"hello/world",
QoS::AtMostOnce,
false,
"both having subscription IDs!",
)
.await
.unwrap();

time::sleep(Duration::from_millis(500)).await;
client.unsubscribe("hello/#").await.unwrap();
client.subscribe("hello/#", QoS::AtMostOnce).await.unwrap();
time::sleep(Duration::from_millis(500)).await;

// we will receive two publishes
// but only one will have subscription ID
// cuz we unsubscribed to hello/# and then
// subscribed without properties!
client
.publish(
"hello/world",
QoS::AtMostOnce,
false,
"Only one with subscription ID!",
)
.await
.unwrap();
}
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Subscription IDs in v5 publish packets (#632)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct Connection {
pub(crate) topic_aliases: HashMap<u16, Topic>,
/// Topic aliases used by broker
pub(crate) broker_topic_aliases: Option<BrokerAliases>,
/// subscription IDs for a connection
pub(crate) subscription_ids: HashMap<Filter, usize>,
}

impl Connection {
Expand Down Expand Up @@ -68,6 +70,7 @@ impl Connection {
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
subscription_ids: HashMap::new(),
}
}
}
Expand Down
62 changes: 50 additions & 12 deletions rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use thiserror::Error;
use tracing::{debug, error, info, trace, warn};

use super::alertlog::{Alert, AlertLog};
use super::connection::BrokerAliases;
use super::graveyard::Graveyard;
use super::iobufs::{Incoming, Outgoing};
use super::logs::{AckLog, DataLog};
Expand Down Expand Up @@ -615,7 +614,7 @@ impl Router {
);
};
}
Packet::Subscribe(subscribe, _) => {
Packet::Subscribe(subscribe, props) => {
let mut return_codes = Vec::new();
let pkid = subscribe.pkid;
// let len = s.len();
Expand All @@ -637,9 +636,24 @@ impl Router {

let filter = &f.path;
let qos = f.qos;
let subscription_id = props.as_ref().and_then(|p| p.id);

if subscription_id == Some(0) {
error!("Subscription identifier can't be 0");
disconnect = true;
disconnect_reason = Some(DisconnectReasonCode::ProtocolError);
break;
}

let (idx, cursor) = self.datalog.next_native_offset(filter);
self.prepare_filter(id, cursor, idx, filter.clone(), qos as u8);
self.prepare_filter(
id,
cursor,
idx,
filter.clone(),
qos as u8,
subscription_id,
);
self.datalog
.handle_retained_messages(filter, &mut self.notifications);

Expand Down Expand Up @@ -689,6 +703,9 @@ impl Router {
broker_aliases.remove_alias(filter);
}

// remove the subscription id
connection.subscription_ids.remove(filter);

let unsuback = UnsubAck {
pkid,
// reasons are used in MQTTv5
Expand Down Expand Up @@ -856,6 +873,7 @@ impl Router {
filter_idx: FilterIdx,
filter: String,
qos: u8,
subscription_id: Option<usize>,
) {
// Add connection id to subscription list
match self.subscription_map.get_mut(&filter) {
Expand All @@ -872,6 +890,12 @@ impl Router {
// Prepare consumer to pull data in case of subscription
let connection = self.connections.get_mut(id).unwrap();

if let Some(subscription_id) = subscription_id {
connection
.subscription_ids
.insert(filter.clone(), subscription_id);
}

if connection.subscriptions.insert(filter.clone()) {
let request = DataRequest {
filter: filter.clone(),
Expand Down Expand Up @@ -919,7 +943,6 @@ impl Router {
ack_device_data(ackslog, outgoing);

let connection = &mut self.connections[id];
let broker_topic_aliases = &mut connection.broker_topic_aliases;

// A new connection's tracker is always initialized with acks request.
// A subscribe will register data request.
Expand All @@ -937,13 +960,7 @@ impl Router {
}
};

match forward_device_data(
&mut request,
datalog,
outgoing,
alertlog,
broker_topic_aliases,
) {
match forward_device_data(&mut request, datalog, outgoing, alertlog, connection) {
ConsumeStatus::BufferFull => {
requests.push_back(request);
self.scheduler.pause(id, PauseReason::Busy);
Expand Down Expand Up @@ -1068,6 +1085,17 @@ fn append_to_commitlog(
p.topic_alias.take()
});

// TODO: broker should properly send the disconnect packet!
if properties
.as_ref()
.is_some_and(|p| !p.subscription_identifiers.is_empty())
{
error!("A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier");
return Err(RouterError::Disconnect(
DisconnectReasonCode::MalformedPacket,
));
}

if let Some(alias) = topic_alias {
validate_and_set_topic_alias(&mut publish, connection, alias)?;
};
Expand Down Expand Up @@ -1210,7 +1238,7 @@ fn forward_device_data(
datalog: &DataLog,
outgoing: &mut Outgoing,
alertlog: &mut AlertLog,
broker_topic_aliases: &mut Option<BrokerAliases>,
connection: &mut Connection,
) -> ConsumeStatus {
let span = tracing::info_span!("outgoing_publish", client_id = outgoing.client_id);
let _guard = span.enter();
Expand Down Expand Up @@ -1282,6 +1310,7 @@ fn forward_device_data(
return ConsumeStatus::FilterCaughtup;
}

let broker_topic_aliases = &mut connection.broker_topic_aliases;
let mut topic_alias = broker_topic_aliases
.as_ref()
.and_then(|aliases| aliases.get_alias(&request.filter));
Expand All @@ -1295,6 +1324,8 @@ fn forward_device_data(
.and_then(|broker_aliases| broker_aliases.set_new_alias(&request.filter))
}

let subscription_id = connection.subscription_ids.get(&request.filter);

// Fill and notify device data
let forwards = publishes
.into_iter()
Expand All @@ -1313,6 +1344,13 @@ fn forward_device_data(
publish.topic.clear()
}

if let Some(&subscription_id) = subscription_id {
// create new props if not already exists
let mut props = properties.unwrap_or_default();
props.subscription_identifiers.push(subscription_id);
properties = Some(props);
}

Forward {
cursor: offset,
size: 0,
Expand Down