Skip to content

Commit

Permalink
Removed session expiry timer from client.
Browse files Browse the repository at this point in the history
I thought that the client needs to have session expiry timer to erase
inflight messages. So I implemented the session expiry timer for the
client.

However, I found the following clause in the spec.

---

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078

If the Server accepts a connection with Clean Start set to 1, the Server
MUST set Session Present to 0 in the CONNACK packet in addition to setting
a 0x00 (Success) Reason Code in the CONNACK packet [MQTT-3.2.2-2].

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048

The Client can avoid implementing its own Session expiry and instead rely on
the Session Present flag returned from the Server to determine if the Session
had expired. If the Client does implement its own Session expiry, it needs to
store the time at which the Session State will be deleted as part of its
Session State.

---

That means the client can remove the session expiry timer depends on
broker side session existance that is reported by session_present flag
on CONNACK packet.

The spac also said that as follows. It is the session state mismatching
between the client and the broker. If they have the timers individually,
then it could happen. It is a kind of clock skew.

Depending on session_present can automatically avoid this issue.
So, I decided removing from session expiry timer from the client and use
session_present flag to erase the inflight messages.
It is a better design.

---
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078

If the Client does not have Session State and receives Session Present
set to 1 it MUST close the Network Connection [MQTT-3.2.2-4]. If it
wishes to restart with a new Session the Client can reconnect using
Clean Start set to 1.

If the Client does have Session State and receives Session Present set
to 0 it MUST discard its Session State if it continues with the Network
Connection [MQTT-3.2.2-5].

---
  • Loading branch information
redboltz committed Dec 18, 2020
1 parent 364be87 commit 3b085ae
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 113 deletions.
1 change: 0 additions & 1 deletion include/mqtt/callable_overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ struct callable_overlay final : public Impl
MQTT_ALWAYS_INLINE bool on_v5_connack(bool session_present,
v5::connect_reason_code reason_code,
v5::properties props) noexcept override final {
if (!base::on_v5_connack(session_present, reason_code, props)) return false;
return ! h_v5_connack_
|| h_v5_connack_(session_present, reason_code, force_move(props));
}
Expand Down
93 changes: 2 additions & 91 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
* After constructing a endpoint, the clean session is set to false.
*/
void set_clean_session(bool cs) {
base::clean_session_ = cs;
set_clean_start(cs);
}

/**
Expand All @@ -276,7 +276,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
* After constructing a endpoint, the clean start is set to false.
*/
void set_clean_start(bool cs) {
set_clean_session(cs);
base::clean_start_ = cs;
}

/**
Expand Down Expand Up @@ -747,7 +747,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand All @@ -773,7 +772,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand Down Expand Up @@ -850,7 +848,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -893,7 +890,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_handler_t func = async_handler_t()) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -947,8 +943,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
,
path_(force_move(path))
#endif // defined(MQTT_USE_WS)
,
tim_session_expiry_(ioc_)
{
#if defined(MQTT_USE_TLS)
ctx_.set_verify_mode(tls::verify_peer);
Expand Down Expand Up @@ -1230,7 +1224,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(*socket_, force_move(props), force_move(session_life_keeper));
}

Expand All @@ -1247,7 +1240,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), ec);
}

Expand Down Expand Up @@ -1294,7 +1286,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
async_handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), force_move(func));
}
);
Expand Down Expand Up @@ -1338,92 +1329,14 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
set_timer();
}

static optional<session_expiry_interval_t> get_session_expiry_interval_by_props(v5::properties const& props) {
optional<session_expiry_interval_t> val;
for (auto const& prop : props) {
visit(
make_lambda_visitor(
[&val](v5::property::session_expiry_interval const& p) {
val = p.val();
},
[](auto&&) {
}
), prop
);
if (val) break;
}
return val;
}

void set_session_expiry_interval_on_disconnect(v5::properties const& props) {
auto sei = get_session_expiry_interval_by_props(props);
if (!sei) return;

if (session_expiry_interval_ == 0 && sei.value() != 0) {
MQTT_LOG("mqtt_api", warning)
<< MQTT_ADD_VALUE(address, this)
<< "session_expiry_interval was 0 but at disconnect "
<< sei.value()
<< " is set.";
}
session_expiry_interval_ = sei.value();
}

void set_session_expiry_timer() {
if (base::get_protocol_version() != protocol_version::v5) return;
if (session_expiry_interval_== session_never_expire) return;
if (session_expiry_interval_== 0) {
base::clear_session_data();
return;
}
tim_session_expiry_.expires_after(std::chrono::seconds(session_expiry_interval_));
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_session_expiry_.async_wait(
[wp = force_move(wp)](error_code ec) {
if (auto sp = wp.lock()) {
if (!ec) {
sp->clear_session_data();
}
}
}
);

}

public:
void cancel_session_expiry_timer() {
tim_session_expiry_.cancel();
}

protected:
bool on_v5_connack(bool /*session_present*/,
v5::connect_reason_code /*reason_code*/,
v5::properties props) noexcept override {
cancel_session_expiry_timer();

// The current implementation is simply
// overwrite by broker's session expiry interval
//
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082
// broker could return session expiry interval with CONNACK
// only when the client sent CONNECT withou session expiry interval.
// The spec doesn't say this rule violation case.
auto sei = get_session_expiry_interval_by_props(props);
if (sei) {
session_expiry_interval_ = sei.value();
}
return true;
}

void on_close() noexcept override {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

void on_error(error_code ec) noexcept override {
(void)ec;
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

// Ensure that only code that knows the *exact* type of an object
Expand Down Expand Up @@ -1474,8 +1387,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
#if defined(MQTT_USE_WS)
std::string path_;
#endif // defined(MQTT_USE_WS)
session_expiry_interval_t session_expiry_interval_ = 0;
as::steady_timer tim_session_expiry_;
};

inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
Expand Down
5 changes: 5 additions & 0 deletions include/mqtt/connect_flags.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace MQTT_NS {
namespace connect_flags {

constexpr char const clean_session = 0b00000010;
constexpr char const clean_start = 0b00000010;
constexpr char const will_flag = 0b00000100;
constexpr char const will_retain = 0b00100000;
constexpr char const password_flag = 0b01000000;
Expand All @@ -25,6 +26,10 @@ constexpr bool has_clean_session(char v) {
return (v & clean_session) != 0;
}

constexpr bool has_clean_start(char v) {
return (v & clean_start) != 0;
}

constexpr bool has_will_flag(char v) {
return (v & will_flag) != 0;
}
Expand Down
66 changes: 50 additions & 16 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @return clean session
*/
bool clean_session() const {
return clean_session_;
return clean_start();
}

/**
Expand All @@ -818,7 +818,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @return clean start
*/
bool clean_start() const {
return clean_session();
return clean_start_;
}

/**
Expand Down Expand Up @@ -6695,7 +6695,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
info.connect_flag = buf[i++];

info.keep_alive = make_uint16_t(buf[i], buf[i + 1]);
clean_session_ = connect_flags::has_clean_session(info.connect_flag);
clean_start_ = connect_flags::has_clean_start(info.connect_flag);

buf.remove_prefix(info.header_len); // consume buffer
if(version_ == protocol_version::v5) {
Expand Down Expand Up @@ -6946,7 +6946,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
force_move(info.will_payload),
connect_flags::has_will_retain(info.connect_flag) | connect_flags::will_qos(info.connect_flag))
: optional<will>(nullopt),
clean_session_,
clean_session(),
info.keep_alive
)
) {
Expand All @@ -6965,7 +6965,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
connect_flags::has_will_retain(info.connect_flag) | connect_flags::will_qos(info.connect_flag),
force_move(info.will_props))
: optional<will>(nullopt),
clean_session_,
clean_start(),
info.keep_alive,
force_move(info.props)
)
Expand Down Expand Up @@ -7099,7 +7099,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
) mutable {
switch (version_) {
case protocol_version::v3_1_1:
if(on_connack(info.session_present,
if (on_connack(info.session_present,
variant_get<connect_return_code>(info.reason_code))) {
on_mqtt_message_processed(force_move(session_life_keeper));
}
Expand All @@ -7122,11 +7122,42 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
if ( ( (0 == variant_idx(info.reason_code))
&& (connect_return_code::accepted == variant_get<connect_return_code>(info.reason_code)))
|| ( (1 == variant_idx(info.reason_code))
&& (v5::connect_reason_code::success == variant_get<v5::connect_reason_code>(info.reason_code)))) {
if (clean_session_) {
clear_session_data();
}
else {
&& (v5::connect_reason_code::success == variant_get<v5::connect_reason_code>(info.reason_code)))) {

// If session_present is false, then call clear_session_data().
// Here is the reason why it works well.
// ---
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078
//
// If the Server accepts a connection with Clean Start set to 1, the Server
// MUST set Session Present to 0 in the CONNACK packet in addition to setting
// a 0x00 (Success) Reason Code in the CONNACK packet [MQTT-3.2.2-2].
//
//
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048
//
// The Client can avoid implementing its own Session expiry and instead rely on
// the Session Present flag returned from the Server to determine if the Session
// had expired. If the Client does implement its own Session expiry, it needs to
// store the time at which the Session State will be deleted as part of its
// Session State.
// ---
//
// Also it can avoid the following client side and broker side session state
// mismatch autonatically.
//
// ---
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078
//
// If the Client does not have Session State and receives Session Present
// set to 1 it MUST close the Network Connection [MQTT-3.2.2-4]. If it
// wishes to restart with a new Session the Client can reconnect using
// Clean Start set to 1.
// If the Client does have Session State and receives Session Present set
// to 0 it MUST discard its Session State if it continues with the Network
// Connection [MQTT-3.2.2-5].
// ---
if (info.session_present) {
if (async_send_store_) {
// Until all stored messages are written to internal send buffer,
// disable further async reading of incoming packets..
Expand All @@ -7150,6 +7181,9 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
send_store();
}
else {
clear_session_data();
}
}
connack_proc(force_move(session_life_keeper), force_move(info));
} break;
Expand Down Expand Up @@ -9050,7 +9084,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_session(),
force_move(w),
force_move(user_name),
force_move(password)
Expand All @@ -9062,7 +9096,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_start(),
force_move(w),
force_move(user_name),
force_move(password),
Expand Down Expand Up @@ -9546,7 +9580,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_session(),
w,
force_move(user_name),
force_move(password)
Expand All @@ -9559,7 +9593,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_start(),
w,
force_move(user_name),
force_move(password),
Expand Down Expand Up @@ -10322,7 +10356,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
~endpoint() = default;

protected:
bool clean_session_{false};
bool clean_start_{false};

private:
optional<MQTT_NS::socket> socket_;
Expand Down
1 change: 0 additions & 1 deletion include/mqtt/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> {
public:
using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint;
protected:
bool on_v5_connack(bool, v5::connect_reason_code, v5::properties) noexcept override { return true; }
void on_pre_send() noexcept override {}
void on_close() noexcept override {}
void on_error(error_code /*ec*/) noexcept override {}
Expand Down
4 changes: 2 additions & 2 deletions include/mqtt/v5_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class connect_message {
connect_message(
std::uint16_t keep_alive_sec,
buffer client_id,
bool clean_session,
bool clean_start,
optional<will> w,
optional<buffer> user_name,
optional<buffer> password,
Expand Down Expand Up @@ -179,7 +179,7 @@ class connect_message {
remaining_length_ += property_length_buf_.size() + property_length_;

utf8string_check(client_id_);
if (clean_session) connect_flags_ |= connect_flags::clean_session;
if (clean_start) connect_flags_ |= connect_flags::clean_start;
if (user_name) {
utf8string_check(user_name.value());
connect_flags_ |= connect_flags::user_name_flag;
Expand Down
Loading

0 comments on commit 3b085ae

Please sign in to comment.