Skip to content

Commit

Permalink
Merge pull request #767 from redboltz/remove_client_side_session_expi…
Browse files Browse the repository at this point in the history
…ry_timer

Remove client side session expiry timer
  • Loading branch information
redboltz authored Dec 18, 2020
2 parents 364be87 + a5d9118 commit abb4f52
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 135 deletions.
1 change: 0 additions & 1 deletion include/mqtt/callable_overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ struct callable_overlay final : public Impl
MQTT_ALWAYS_INLINE bool on_v5_connack(bool session_present,
v5::connect_reason_code reason_code,
v5::properties props) noexcept override final {
if (!base::on_v5_connack(session_present, reason_code, props)) return false;
return ! h_v5_connack_
|| h_v5_connack_(session_present, reason_code, force_move(props));
}
Expand Down
93 changes: 2 additions & 91 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
* After constructing a endpoint, the clean session is set to false.
*/
void set_clean_session(bool cs) {
base::clean_session_ = cs;
set_clean_start(cs);
}

/**
Expand All @@ -276,7 +276,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
* After constructing a endpoint, the clean start is set to false.
*/
void set_clean_start(bool cs) {
set_clean_session(cs);
base::clean_start_ = cs;
}

/**
Expand Down Expand Up @@ -747,7 +747,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand All @@ -773,7 +772,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand Down Expand Up @@ -850,7 +848,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -893,7 +890,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_handler_t func = async_handler_t()) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -947,8 +943,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
,
path_(force_move(path))
#endif // defined(MQTT_USE_WS)
,
tim_session_expiry_(ioc_)
{
#if defined(MQTT_USE_TLS)
ctx_.set_verify_mode(tls::verify_peer);
Expand Down Expand Up @@ -1230,7 +1224,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(*socket_, force_move(props), force_move(session_life_keeper));
}

Expand All @@ -1247,7 +1240,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), ec);
}

Expand Down Expand Up @@ -1294,7 +1286,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
async_handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), force_move(func));
}
);
Expand Down Expand Up @@ -1338,92 +1329,14 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
set_timer();
}

static optional<session_expiry_interval_t> get_session_expiry_interval_by_props(v5::properties const& props) {
optional<session_expiry_interval_t> val;
for (auto const& prop : props) {
visit(
make_lambda_visitor(
[&val](v5::property::session_expiry_interval const& p) {
val = p.val();
},
[](auto&&) {
}
), prop
);
if (val) break;
}
return val;
}

void set_session_expiry_interval_on_disconnect(v5::properties const& props) {
auto sei = get_session_expiry_interval_by_props(props);
if (!sei) return;

if (session_expiry_interval_ == 0 && sei.value() != 0) {
MQTT_LOG("mqtt_api", warning)
<< MQTT_ADD_VALUE(address, this)
<< "session_expiry_interval was 0 but at disconnect "
<< sei.value()
<< " is set.";
}
session_expiry_interval_ = sei.value();
}

void set_session_expiry_timer() {
if (base::get_protocol_version() != protocol_version::v5) return;
if (session_expiry_interval_== session_never_expire) return;
if (session_expiry_interval_== 0) {
base::clear_session_data();
return;
}
tim_session_expiry_.expires_after(std::chrono::seconds(session_expiry_interval_));
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_session_expiry_.async_wait(
[wp = force_move(wp)](error_code ec) {
if (auto sp = wp.lock()) {
if (!ec) {
sp->clear_session_data();
}
}
}
);

}

public:
void cancel_session_expiry_timer() {
tim_session_expiry_.cancel();
}

protected:
bool on_v5_connack(bool /*session_present*/,
v5::connect_reason_code /*reason_code*/,
v5::properties props) noexcept override {
cancel_session_expiry_timer();

// The current implementation is simply
// overwrite by broker's session expiry interval
//
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082
// broker could return session expiry interval with CONNACK
// only when the client sent CONNECT withou session expiry interval.
// The spec doesn't say this rule violation case.
auto sei = get_session_expiry_interval_by_props(props);
if (sei) {
session_expiry_interval_ = sei.value();
}
return true;
}

void on_close() noexcept override {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

void on_error(error_code ec) noexcept override {
(void)ec;
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

// Ensure that only code that knows the *exact* type of an object
Expand Down Expand Up @@ -1474,8 +1387,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
#if defined(MQTT_USE_WS)
std::string path_;
#endif // defined(MQTT_USE_WS)
session_expiry_interval_t session_expiry_interval_ = 0;
as::steady_timer tim_session_expiry_;
};

inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
Expand Down
5 changes: 5 additions & 0 deletions include/mqtt/connect_flags.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace MQTT_NS {
namespace connect_flags {

constexpr char const clean_session = 0b00000010;
constexpr char const clean_start = 0b00000010;
constexpr char const will_flag = 0b00000100;
constexpr char const will_retain = 0b00100000;
constexpr char const password_flag = 0b01000000;
Expand All @@ -25,6 +26,10 @@ constexpr bool has_clean_session(char v) {
return (v & clean_session) != 0;
}

constexpr bool has_clean_start(char v) {
return (v & clean_start) != 0;
}

constexpr bool has_will_flag(char v) {
return (v & will_flag) != 0;
}
Expand Down
66 changes: 50 additions & 16 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @return clean session
*/
bool clean_session() const {
return clean_session_;
return clean_start();
}

/**
Expand All @@ -818,7 +818,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @return clean start
*/
bool clean_start() const {
return clean_session();
return clean_start_;
}

/**
Expand Down Expand Up @@ -6695,7 +6695,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
info.connect_flag = buf[i++];

info.keep_alive = make_uint16_t(buf[i], buf[i + 1]);
clean_session_ = connect_flags::has_clean_session(info.connect_flag);
clean_start_ = connect_flags::has_clean_start(info.connect_flag);

buf.remove_prefix(info.header_len); // consume buffer
if(version_ == protocol_version::v5) {
Expand Down Expand Up @@ -6946,7 +6946,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
force_move(info.will_payload),
connect_flags::has_will_retain(info.connect_flag) | connect_flags::will_qos(info.connect_flag))
: optional<will>(nullopt),
clean_session_,
clean_session(),
info.keep_alive
)
) {
Expand All @@ -6965,7 +6965,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
connect_flags::has_will_retain(info.connect_flag) | connect_flags::will_qos(info.connect_flag),
force_move(info.will_props))
: optional<will>(nullopt),
clean_session_,
clean_start(),
info.keep_alive,
force_move(info.props)
)
Expand Down Expand Up @@ -7099,7 +7099,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
) mutable {
switch (version_) {
case protocol_version::v3_1_1:
if(on_connack(info.session_present,
if (on_connack(info.session_present,
variant_get<connect_return_code>(info.reason_code))) {
on_mqtt_message_processed(force_move(session_life_keeper));
}
Expand All @@ -7122,11 +7122,42 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
if ( ( (0 == variant_idx(info.reason_code))
&& (connect_return_code::accepted == variant_get<connect_return_code>(info.reason_code)))
|| ( (1 == variant_idx(info.reason_code))
&& (v5::connect_reason_code::success == variant_get<v5::connect_reason_code>(info.reason_code)))) {
if (clean_session_) {
clear_session_data();
}
else {
&& (v5::connect_reason_code::success == variant_get<v5::connect_reason_code>(info.reason_code)))) {

// If session_present is false, then call clear_session_data().
// Here is the reason why it works well.
// ---
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078
//
// If the Server accepts a connection with Clean Start set to 1, the Server
// MUST set Session Present to 0 in the CONNACK packet in addition to setting
// a 0x00 (Success) Reason Code in the CONNACK packet [MQTT-3.2.2-2].
//
//
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048
//
// The Client can avoid implementing its own Session expiry and instead rely on
// the Session Present flag returned from the Server to determine if the Session
// had expired. If the Client does implement its own Session expiry, it needs to
// store the time at which the Session State will be deleted as part of its
// Session State.
// ---
//
// Also it can avoid the following client side and broker side session state
// mismatch autonatically.
//
// ---
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901078
//
// If the Client does not have Session State and receives Session Present
// set to 1 it MUST close the Network Connection [MQTT-3.2.2-4]. If it
// wishes to restart with a new Session the Client can reconnect using
// Clean Start set to 1.
// If the Client does have Session State and receives Session Present set
// to 0 it MUST discard its Session State if it continues with the Network
// Connection [MQTT-3.2.2-5].
// ---
if (info.session_present) {
if (async_send_store_) {
// Until all stored messages are written to internal send buffer,
// disable further async reading of incoming packets..
Expand All @@ -7150,6 +7181,9 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
send_store();
}
else {
clear_session_data();
}
}
connack_proc(force_move(session_life_keeper), force_move(info));
} break;
Expand Down Expand Up @@ -9050,7 +9084,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_session(),
force_move(w),
force_move(user_name),
force_move(password)
Expand All @@ -9062,7 +9096,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_start(),
force_move(w),
force_move(user_name),
force_move(password),
Expand Down Expand Up @@ -9546,7 +9580,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_session(),
w,
force_move(user_name),
force_move(password)
Expand All @@ -9559,7 +9593,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::connect_message(
keep_alive_sec,
force_move(client_id),
clean_session_,
clean_start(),
w,
force_move(user_name),
force_move(password),
Expand Down Expand Up @@ -10322,7 +10356,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
~endpoint() = default;

protected:
bool clean_session_{false};
bool clean_start_{false};

private:
optional<MQTT_NS::socket> socket_;
Expand Down
1 change: 0 additions & 1 deletion include/mqtt/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> {
public:
using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint;
protected:
bool on_v5_connack(bool, v5::connect_reason_code, v5::properties) noexcept override { return true; }
void on_pre_send() noexcept override {}
void on_close() noexcept override {}
void on_error(error_code /*ec*/) noexcept override {}
Expand Down
4 changes: 2 additions & 2 deletions include/mqtt/v5_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class connect_message {
connect_message(
std::uint16_t keep_alive_sec,
buffer client_id,
bool clean_session,
bool clean_start,
optional<will> w,
optional<buffer> user_name,
optional<buffer> password,
Expand Down Expand Up @@ -179,7 +179,7 @@ class connect_message {
remaining_length_ += property_length_buf_.size() + property_length_;

utf8string_check(client_id_);
if (clean_session) connect_flags_ |= connect_flags::clean_session;
if (clean_start) connect_flags_ |= connect_flags::clean_start;
if (user_name) {
utf8string_check(user_name.value());
connect_flags_ |= connect_flags::user_name_flag;
Expand Down
Loading

0 comments on commit abb4f52

Please sign in to comment.