Skip to content

Commit

Permalink
Moved shared_subscription parse to SDK part.
Browse files Browse the repository at this point in the history
It is a breaking change for brokers (servers).
I introduced the breaking change by the following reason.

1. It is a part of MQTT protocol. It can avoid the same logic
imprementation by all broker developer.
2. I have a plan that adding protocol error checking more in the SDK.
   For example, the combination of share name and NL:1 is a protocol
   error.
   In order to ckeck these kind of error, I need to parse share name
   at the SDK side.
  • Loading branch information
redboltz committed Dec 11, 2020
1 parent 88d0592 commit e515e8f
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 80 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# MQTT client/server for C++14 based on Boost.Asio

Version 9.0.0 [![Actions Status](https://github.com/redboltz/mqtt_cpp/workflows/CI/badge.svg)](https://github.com/redboltz/mqtt_cpp/actions)[![Build Status](https://dev.azure.com/redboltz/redboltz/_apis/build/status/redboltz.mqtt_cpp?branchName=master)](https://dev.azure.com/redboltz/redboltz/_build/latest?definitionId=6&branchName=master)[![codecov](https://codecov.io/gh/redboltz/mqtt_cpp/branch/master/graph/badge.svg)](https://codecov.io/gh/redboltz/mqtt_cpp)
Version 10.0.0 [![Actions Status](https://github.com/redboltz/mqtt_cpp/workflows/CI/badge.svg)](https://github.com/redboltz/mqtt_cpp/actions)[![Build Status](https://dev.azure.com/redboltz/redboltz/_apis/build/status/redboltz.mqtt_cpp?branchName=master)](https://dev.azure.com/redboltz/redboltz/_build/latest?definitionId=6&branchName=master)[![codecov](https://codecov.io/gh/redboltz/mqtt_cpp/branch/master/graph/badge.svg)](https://codecov.io/gh/redboltz/mqtt_cpp)

Important note https://github.com/redboltz/mqtt_cpp/wiki/News.

Expand Down
59 changes: 27 additions & 32 deletions include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <mqtt/broker/retained_messages.hpp>

#include <mqtt/broker/retained_topic_map.hpp>
#include <mqtt/broker/shared_subscriptions.hpp>
#include <mqtt/broker/shared_target_impl.hpp>

MQTT_BROKER_NS_BEGIN
Expand Down Expand Up @@ -556,7 +555,7 @@ class broker_t {
ep.set_subscribe_handler(
[this, wp]
(packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries) {
std::vector<subscribe_entry> entries) {
con_sp_t sp = wp.lock();
BOOST_ASSERT(sp);
return subscribe_handler(
Expand All @@ -570,7 +569,7 @@ class broker_t {
ep.set_v5_subscribe_handler(
[this, wp]
(packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries,
std::vector<subscribe_entry> entries,
v5::properties props
) {
con_sp_t sp = wp.lock();
Expand All @@ -586,29 +585,29 @@ class broker_t {
ep.set_unsubscribe_handler(
[this, wp]
(packet_id_t packet_id,
std::vector<buffer> topics) {
std::vector<unsubscribe_entry> entries) {
con_sp_t sp = wp.lock();
BOOST_ASSERT(sp);
return unsubscribe_handler(
force_move(sp),
packet_id,
force_move(topics),
force_move(entries),
v5::properties{}
);
}
);
ep.set_v5_unsubscribe_handler(
[this, wp]
(packet_id_t packet_id,
std::vector<buffer> topics,
std::vector<unsubscribe_entry> entries,
v5::properties props
) {
con_sp_t sp = wp.lock();
BOOST_ASSERT(sp);
return unsubscribe_handler(
force_move(sp),
packet_id,
force_move(topics),
force_move(entries),
force_move(props)
);
}
Expand Down Expand Up @@ -1270,7 +1269,7 @@ class broker_t {
bool subscribe_handler(
con_sp_t spep,
packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries,
std::vector<subscribe_entry> entries,
v5::properties props) {

auto& ep = *spep;
Expand Down Expand Up @@ -1336,20 +1335,18 @@ class broker_t {
case protocol_version::v3_1_1: {
std::vector<suback_return_code> res;
res.reserve(entries.size());
for (auto const& e : entries) {
auto sn_tf = parse_shared_subscription(std::get<0>(e));
subscribe_options subopts = std::get<1>(e);
res.emplace_back(qos_to_suback_return_code(subopts.get_qos())); // converts to granted_qos_x
for (auto& e : entries) {
res.emplace_back(qos_to_suback_return_code(e.subopts.get_qos())); // converts to granted_qos_x
ssr.get().subscribe(
force_move(sn_tf.share_name),
sn_tf.topic_filter,
subopts,
force_move(e.share_name),
e.topic_filter,
e.subopts,
[&] {
retains_.find(
sn_tf.topic_filter,
e.topic_filter,
[&](retain_t const& r) {
retain_deliver.emplace_back(
[&publish_proc, &r, qos_value = subopts.get_qos(), sid] {
[&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
publish_proc(r, qos_value, sid);
}
);
Expand All @@ -1370,20 +1367,18 @@ class broker_t {

std::vector<v5::suback_reason_code> res;
res.reserve(entries.size());
for (auto const& e : entries) {
auto sn_tf = parse_shared_subscription(std::get<0>(e));
subscribe_options subopts = std::get<1>(e);
res.emplace_back(v5::qos_to_suback_reason_code(subopts.get_qos())); // converts to granted_qos_x
for (auto& e : entries) {
res.emplace_back(v5::qos_to_suback_reason_code(e.subopts.get_qos())); // converts to granted_qos_x
ssr.get().subscribe(
force_move(sn_tf.share_name),
sn_tf.topic_filter,
subopts,
force_move(e.share_name),
e.topic_filter,
e.subopts,
[&] {
retains_.find(
sn_tf.topic_filter,
e.topic_filter,
[&](retain_t const& r) {
retain_deliver.emplace_back(
[&publish_proc, &r, qos_value = subopts.get_qos(), sid] {
[&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
publish_proc(r, qos_value, sid);
}
);
Expand Down Expand Up @@ -1411,7 +1406,7 @@ class broker_t {
bool unsubscribe_handler(
con_sp_t spep,
packet_id_t packet_id,
std::vector<buffer> topic_filters,
std::vector<unsubscribe_entry> entries,
v5::properties props) {

auto& ep = *spep;
Expand Down Expand Up @@ -1439,9 +1434,8 @@ class broker_t {
// For each subscription that this connection has
// Compare against the list of topic filters, and remove
// the subscription if the topic filter is in the list.
for (auto const& whole_topic_filter : topic_filters) {
auto sn_tf = parse_shared_subscription(whole_topic_filter);
ssr.get().unsubscribe(sn_tf.share_name, sn_tf.topic_filter);
for (auto const& e : entries) {
ssr.get().unsubscribe(e.share_name, e.topic_filter);
}

switch (ep.get_protocol_version()) {
Expand All @@ -1453,7 +1447,7 @@ class broker_t {
ep.unsuback(
packet_id,
std::vector<v5::unsuback_reason_code>(
topic_filters.size(),
entries.size(),
v5::unsuback_reason_code::success
),
unsuback_props_
Expand Down Expand Up @@ -1516,7 +1510,8 @@ class broker_t {
}
};

std::set<share_name_topic_filter> sent;
// share_name topic_filter
std::set<std::tuple<string_view, string_view>> sent;

subs_map_.modify(
topic,
Expand Down
1 change: 0 additions & 1 deletion include/mqtt/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <mqtt/broker/tags.hpp>
#include <mqtt/broker/inflight_message.hpp>
#include <mqtt/broker/offline_message.hpp>
#include <mqtt/broker/shared_subscriptions.hpp>

MQTT_BROKER_NS_BEGIN

Expand Down
43 changes: 21 additions & 22 deletions include/mqtt/callable_overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,12 @@ struct callable_overlay final : public Impl
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349801<BR>
* 3.8.2 Variable header
* @param entries
* Collection of a pair of Topic Filter and QoS.<BR>
* Collection of Share Name, Topic Filter, and QoS.<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349802<BR>
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
MQTT_ALWAYS_INLINE bool on_subscribe(packet_id_t packet_id,
std::vector<std::tuple<buffer,
subscribe_options>> entries) noexcept override final {
std::vector<subscribe_entry> entries) noexcept override final {
return ! h_subscribe_
|| h_subscribe_(packet_id, force_move(entries));
}
Expand Down Expand Up @@ -244,14 +243,14 @@ struct callable_overlay final : public Impl
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349810<BR>
* 3.10.2 Variable header
* @param topics
* Collection of Topic Filters<BR>
* Collection of Share Name and Topic Filter<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800448<BR>
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
MQTT_ALWAYS_INLINE bool on_unsubscribe(packet_id_t packet_id,
std::vector<buffer> topics) noexcept override final {
std::vector<unsubscribe_entry> entries) noexcept override final {
return ! h_unsubscribe_
|| h_unsubscribe_(packet_id, force_move(topics));
|| h_unsubscribe_(packet_id, force_move(entries));
}

/**
Expand Down Expand Up @@ -497,7 +496,7 @@ struct callable_overlay final : public Impl
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901163<BR>
* 3.8.2 Variable header
* @param entries
* Collection of a pair of Topic Filter and QoS.<BR>
* Collection of Share Name, Topic Filter, and QoS.<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168<BR>
* @param props
* Properties<BR>
Expand All @@ -506,7 +505,7 @@ struct callable_overlay final : public Impl
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
MQTT_ALWAYS_INLINE bool on_v5_subscribe(packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries,
std::vector<subscribe_entry> entries,
v5::properties props) noexcept override final {
return ! h_v5_subscribe_
|| h_v5_subscribe_(packet_id, force_move(entries), force_move(props));
Expand Down Expand Up @@ -539,8 +538,8 @@ struct callable_overlay final : public Impl
* @param packet_id packet identifier<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901181<BR>
* 3.10.2 Variable header
* @param topics
* Collection of Topic Filters<BR>
* @param entries
* Collection of Share Name and Topic Filter<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901185<BR>
* 3.10.3 UNSUBSCRIBE Payload
* @param props
Expand All @@ -550,10 +549,10 @@ struct callable_overlay final : public Impl
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
MQTT_ALWAYS_INLINE bool on_v5_unsubscribe(packet_id_t packet_id,
std::vector<buffer> topics,
std::vector<unsubscribe_entry> entries,
v5::properties props) noexcept override final {
return ! h_v5_unsubscribe_
|| h_v5_unsubscribe_(packet_id, force_move(topics), force_move(props));
|| h_v5_unsubscribe_(packet_id, force_move(entries), force_move(props));
}

/**
Expand Down Expand Up @@ -900,12 +899,12 @@ struct callable_overlay final : public Impl
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349801<BR>
* 3.8.2 Variable header
* @param entries
* Collection of a pair of Topic Filter and QoS.<BR>
* Collection of Share Name, Topic Filter, and QoS.<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349802<BR>
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
using subscribe_handler = std::function<bool(packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries)>;
std::vector<subscribe_entry> entries)>;

/**
* @brief Suback handler
Expand All @@ -926,13 +925,13 @@ struct callable_overlay final : public Impl
* @param packet_id packet identifier<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349810<BR>
* 3.10.2 Variable header
* @param topics
* Collection of Topic Filters<BR>
* @param entries
* Collection of Share Name and Topic Filter<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800448<BR>
* @return if the handler returns true, then continue receiving, otherwise quit.
*/
using unsubscribe_handler = std::function<bool(packet_id_t packet_id,
std::vector<buffer> topics)>;
std::vector<unsubscribe_entry> entries)>;

/**
* @brief Unsuback handler
Expand Down Expand Up @@ -1154,7 +1153,7 @@ struct callable_overlay final : public Impl
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901163<BR>
* 3.8.2 Variable header
* @param entries
* Collection of a pair of Topic Filter and QoS.<BR>
* Collection of Share Name, Topic Filter, and Subscribe Options.<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168<BR>
* @param props
* Properties<BR>
Expand All @@ -1164,7 +1163,7 @@ struct callable_overlay final : public Impl
*/
using v5_subscribe_handler = std::function<
bool(packet_id_t packet_id,
std::vector<std::tuple<buffer, subscribe_options>> entries,
std::vector<subscribe_entry> entries,
v5::properties props)
>;

Expand Down Expand Up @@ -1194,8 +1193,8 @@ struct callable_overlay final : public Impl
* @param packet_id packet identifier<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901181<BR>
* 3.10.2 Variable header
* @param topics
* Collection of Topic Filters<BR>
* @param entries
* Collection of Share Name and Topic Filter<BR>
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901185<BR>
* 3.10.3 UNSUBSCRIBE Payload
* @param props
Expand All @@ -1206,7 +1205,7 @@ struct callable_overlay final : public Impl
*/
using v5_unsubscribe_handler = std::function<
bool(packet_id_t packet_id,
std::vector<buffer> topics,
std::vector<unsubscribe_entry> entries,
v5::properties props)
>;

Expand Down
Loading

0 comments on commit e515e8f

Please sign in to comment.