Skip to content

Commit

Permalink
Added topic alias support.
Browse files Browse the repository at this point in the history
TopicAlias lifetime is the same as Session lifetime
It is different from MQTT v5 spec but practical choice.
See
https://lists.oasis-open.org/archives/mqtt-comment/202009/msg00000.html

Related fix:

When topic is empty and no topic alias entry is found, then
protocol_error.
The broker send DISCONNECT packet with protocol_error reason code.

process_disconnect implementation was simply doesn't call
`on_mqtt_message_processed()` but it was bad.
It caused exit from ioc.run() loop because the next async_read was not
called.
The correct behavior is calling `on_mqtt_message_processed()` and close
socket. The endpoint can detect the socket close by on_error().

Fixed topic alias.
  • Loading branch information
redboltz committed Sep 10, 2020
1 parent b913e28 commit ffdc21e
Show file tree
Hide file tree
Showing 26 changed files with 1,021 additions and 55 deletions.
8 changes: 3 additions & 5 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1389,20 +1389,18 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}

static optional<session_expiry_interval_t> get_session_expiry_interval_by_props(v5::properties const& props) {
bool finish = false;
optional<session_expiry_interval_t > val;
optional<session_expiry_interval_t> val;
for (auto const& prop : props) {
MQTT_NS::visit(
make_lambda_visitor(
[&finish, &val](v5::property::session_expiry_interval const& p) {
[&val](v5::property::session_expiry_interval const& p) {
val = p.val();
finish = true;
},
[](auto&&) {
}
), prop
);
if (finish) break;
if (val) break;
}
return val;
}
Expand Down
1 change: 1 addition & 0 deletions include/mqtt/constant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace MQTT_NS {

static constexpr session_expiry_interval_t const session_never_expire = 0xffffffffUL;
static constexpr topic_alias_t const topic_alias_max = 0xffff;

} // namespace MQTT_NS

Expand Down
97 changes: 90 additions & 7 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include <mqtt/error_code.hpp>
#include <mqtt/log.hpp>
#include <mqtt/variant_visit.hpp>
#include <mqtt/topic_alias_recv.hpp>

#if defined(MQTT_USE_WS)
#include <mqtt/ws_endpoint.hpp>
Expand Down Expand Up @@ -990,9 +991,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
<< MQTT_ADD_VALUE(address, this)
<< "force_disconnect";

connected_ = false;
mqtt_connected_ = false;
shutdown_from_client(*socket_);
shutdown(socket());
}

/**
Expand Down Expand Up @@ -4499,6 +4498,28 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
pingresp_timeout_ = mqtt::force_move(tim);
}

/**
* @brief get topic alias recv container.
* @return a copy of topic alias recv container
*
* This function for dump/restore topic alias recv container.
*/
topic_alias_recv_map_t get_topic_alias_recv_container() const {
LockGuard<Mutex> lck (topic_alias_recv_mtx_);
return topic_alias_recv_;
}

/**
* @brief restore topic alias recv container.
* @param con topic alias recv container to restore
*
* This function for dump/restore topic alias recv container.
*/
void restore_topic_alias_recv_container(topic_alias_recv_map_t con) {
LockGuard<Mutex> lck (topic_alias_recv_mtx_);
topic_alias_recv_ = MQTT_NS::force_move(con);
}

protected:

/**
Expand Down Expand Up @@ -4568,9 +4589,15 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}

void clear_session_data() {
LockGuard<Mutex> lck (store_mtx_);
store_.clear();
packet_id_.clear();
{
LockGuard<Mutex> lck (store_mtx_);
store_.clear();
packet_id_.clear();
}
{
LockGuard<Mutex> lck (topic_alias_recv_mtx_);
clear_topic_alias(topic_alias_recv_);
}
}

private:
Expand Down Expand Up @@ -4602,7 +4629,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}

template <typename T>
void shutdown_from_client(T& socket) {
void shutdown(T& socket) {
connected_ = false;
mqtt_connected_ = false;

boost::system::error_code ec;
socket.lowest_layer().close(ec);
}
Expand Down Expand Up @@ -7045,6 +7075,31 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
break;
case protocol_version::v5:
if (info.topic_name.empty()) {
if (auto topic_alias = get_topic_alias_by_props(info.props)) {
auto topic_name = [&] {
LockGuard<Mutex> lck (topic_alias_recv_mtx_);
return find_topic_by_alias(topic_alias_recv_, topic_alias.value());
}();
if (topic_name.empty()) {
MQTT_LOG("mqtt_cb", error)
<< MQTT_ADD_VALUE(address, this)
<< "no matching topic alias: "
<< topic_alias.value();
call_protocol_error_handlers();
return false;
}
else {
info.topic_name = allocate_buffer(topic_name);
}
}
}
else {
if (auto topic_alias = get_topic_alias_by_props(info.props)) {
LockGuard<Mutex> lck (topic_alias_recv_mtx_);
register_topic_alias(topic_alias_recv_, info.topic_name, topic_alias.value());
}
}
if (on_v5_publish(
info.packet_id,
publish_options(fixed_header_),
Expand Down Expand Up @@ -8585,6 +8640,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
default:
BOOST_ASSERT(false);
}
shutdown(*socket_);
on_mqtt_message_processed(force_move(session_life_keeper));
break;
}
}
Expand Down Expand Up @@ -9928,6 +9985,29 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
);
}

static optional<topic_alias_t> get_topic_alias_by_prop(v5::property_variant const& prop) {
optional<topic_alias_t> val;
MQTT_NS::visit(
make_lambda_visitor(
[&val](v5::property::topic_alias const& p) {
val = p.val();
},
[](auto&&) {
}
), prop
);
return val;
}

static optional<topic_alias_t> get_topic_alias_by_props(v5::properties const& props) {
for (auto const& prop : props) {
if (auto val = get_topic_alias_by_prop(prop)) {
return val;
}
}
return nullopt;
}

protected:
// Ensure that only code that knows the *exact* type of an object
// inheriting from this abstract base class can destruct it.
Expand Down Expand Up @@ -9976,6 +10056,9 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
std::chrono::steady_clock::duration pingresp_timeout_ = std::chrono::steady_clock::duration::zero();
as::steady_timer tim_pingresp_;
bool tim_pingresp_set_ = false;

mutable Mutex topic_alias_recv_mtx_;
topic_alias_recv_map_t topic_alias_recv_;
};

} // namespace MQTT_NS
Expand Down
1 change: 1 addition & 0 deletions include/mqtt/setup_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void setup_log(severity_level threshold = severity_level::warning) {
{
{ "mqtt_api", threshold },
{ "mqtt_cb", threshold },
{ "mqtt_impl", threshold },
}
);
}
Expand Down
70 changes: 70 additions & 0 deletions include/mqtt/topic_alias_recv.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright Takatoshi Kondo 2020
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#if !defined(MQTT_TOPIC_ALIAS_RECV_HPP)
#define MQTT_TOPIC_ALIAS_RECV_HPP

#include <string>
#include <map>
#include <array>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>

#include <mqtt/string_view.hpp>
#include <mqtt/constant.hpp>
#include <mqtt/type.hpp>
#include <mqtt/log.hpp>

namespace MQTT_NS {

using topic_alias_recv_map_t = std::map<topic_alias_t, std::string>;

inline void register_topic_alias(topic_alias_recv_map_t& m, string_view topic, topic_alias_t alias) {
BOOST_ASSERT(alias > 0 && alias <= topic_alias_max);

MQTT_LOG("mqtt_impl", info)
<< MQTT_ADD_VALUE(address, &m)
<< "register_topic_alias"
<< " topic:" << topic
<< " alias:" << alias;

if (topic.empty()) {
m.erase(alias);
}
else {
m[alias] = std::string(topic); // overwrite
}
}

inline std::string find_topic_by_alias(topic_alias_recv_map_t const& m, topic_alias_t alias) {
BOOST_ASSERT(alias > 0 && alias <= topic_alias_max);

std::string topic;
auto it = m.find(alias);
if (it != m.end()) topic = it->second;

MQTT_LOG("mqtt_impl", info)
<< MQTT_ADD_VALUE(address, &m)
<< "find_topic_by_alias"
<< " alias:" << alias
<< " topic:" << topic;

return topic;
}

inline void clear_topic_alias(topic_alias_recv_map_t& m) {
MQTT_LOG("mqtt_impl", info)
<< MQTT_ADD_VALUE(address, &m)
<< "clear_topic_alias";

m.clear();
}

} // namespace MQTT_NS

#endif // MQTT_TOPIC_ALIAS_RECV_HPP
1 change: 1 addition & 0 deletions include/mqtt/type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace MQTT_NS {

using session_expiry_interval_t = std::uint32_t;
using topic_alias_t = std::uint16_t;

} // namespace MQTT_NS

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ IF (MQTT_TEST_1)
LIST (APPEND check_PROGRAMS
connect.cpp
underlying_timeout.cpp
topic_alias_recv.cpp
)
ENDIF ()

Expand Down
6 changes: 6 additions & 0 deletions test/as_buffer_async_pubsub_1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ BOOST_AUTO_TEST_SUITE(test_as_buffer_async_pubsub_1)
BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos0 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_sub;
Expand Down Expand Up @@ -233,6 +234,7 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos0 ) {
BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos0 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_pub;
Expand Down Expand Up @@ -459,6 +461,7 @@ BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos0 ) {
BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos0 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_pub;
Expand Down Expand Up @@ -687,6 +690,7 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos0 ) {
BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos1 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_sub;
Expand Down Expand Up @@ -900,6 +904,7 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos1 ) {
BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos1 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_pub;
Expand Down Expand Up @@ -1124,6 +1129,7 @@ BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos1 ) {
BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos1 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

packet_id_t pid_pub;
Expand Down
6 changes: 6 additions & 0 deletions test/as_buffer_async_pubsub_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ BOOST_AUTO_TEST_SUITE(test_as_buffer_async_pubsub_2)
BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos2 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_sub;
Expand Down Expand Up @@ -237,6 +238,7 @@ BOOST_AUTO_TEST_CASE( pub_qos0_sub_qos2 ) {
BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos2 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_pub;
Expand Down Expand Up @@ -470,6 +472,7 @@ BOOST_AUTO_TEST_CASE( pub_qos1_sub_qos2 ) {
BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2 ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_pub;
Expand Down Expand Up @@ -705,6 +708,7 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2 ) {
BOOST_AUTO_TEST_CASE( publish_function ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_sub;
Expand Down Expand Up @@ -917,6 +921,7 @@ BOOST_AUTO_TEST_CASE( publish_function ) {
BOOST_AUTO_TEST_CASE( publish_function_buffer ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_sub;
Expand Down Expand Up @@ -1123,6 +1128,7 @@ BOOST_AUTO_TEST_CASE( publish_function_buffer ) {
BOOST_AUTO_TEST_CASE( publish_dup_function ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& /*b*/) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
c->set_client_id("cid1");
c->set_clean_session(true);

std::uint16_t pid_sub;
Expand Down
Loading

0 comments on commit ffdc21e

Please sign in to comment.