-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add session expiry interval support #657
Changes from 3 commits
66dd3a0
e1db98a
5a2b3d7
d201b13
47af749
8f0a0b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -811,6 +811,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
} | ||
} | ||
); | ||
set_session_expiry_interval_on_disconnect(props); | ||
base::disconnect(reason_code, force_move(props)); | ||
} | ||
} | ||
|
@@ -836,6 +837,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
) { | ||
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel(); | ||
if (base::connected()) { | ||
set_session_expiry_interval_on_disconnect(props); | ||
base::disconnect(reason_code, force_move(props)); | ||
} | ||
} | ||
|
@@ -912,6 +914,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
} | ||
} | ||
); | ||
set_session_expiry_interval_on_disconnect(props); | ||
base::async_disconnect(reason_code, force_move(props), force_move(func)); | ||
} | ||
} | ||
|
@@ -954,6 +957,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
async_handler_t func = async_handler_t()) { | ||
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel(); | ||
if (base::connected()) { | ||
set_session_expiry_interval_on_disconnect(props); | ||
base::async_disconnect(reason_code, force_move(props), force_move(func)); | ||
} | ||
} | ||
|
@@ -1008,6 +1012,8 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
, | ||
path_(force_move(path)) | ||
#endif // defined(MQTT_USE_WS) | ||
, | ||
tim_session_expiry_(ioc_) | ||
{ | ||
#if defined(MQTT_USE_TLS) | ||
ctx_.set_verify_mode(as::ssl::verify_peer); | ||
|
@@ -1291,6 +1297,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) { | ||
set_timer(); | ||
} | ||
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0); | ||
handshake_socket(socket, force_move(props), force_move(session_life_keeper)); | ||
} | ||
|
||
|
@@ -1308,6 +1315,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) { | ||
set_timer(); | ||
} | ||
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0); | ||
handshake_socket(socket, force_move(props), force_move(session_life_keeper), ec); | ||
} | ||
|
||
|
@@ -1338,6 +1346,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) { | ||
set_timer(); | ||
} | ||
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0); | ||
async_handshake_socket(socket, force_move(props), force_move(session_life_keeper), force_move(func)); | ||
}); | ||
} | ||
|
@@ -1378,14 +1387,94 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
set_timer(); | ||
} | ||
|
||
static optional<std::uint32_t> get_session_expiry_interval_by_props(v5::properties const& props) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that the standard explicitly says it's a 32 bit number, but I think we should make this a typedef. E.g "SessionExpiryInterval_t" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented. |
||
bool finish = false; | ||
optional<std::uint32_t> val; | ||
for (auto const& prop : props) { | ||
MQTT_NS::visit( | ||
make_lambda_visitor( | ||
[&finish, &val](v5::property::session_expiry_interval const& p) { | ||
val = p.val(); | ||
finish = true; | ||
}, | ||
[](auto&&) { | ||
} | ||
), prop | ||
); | ||
if (finish) break; | ||
} | ||
return val; | ||
} | ||
|
||
void set_session_expiry_interval_on_disconnect(v5::properties const& props) { | ||
auto sei = get_session_expiry_interval_by_props(props); | ||
if (!sei) return; | ||
|
||
if (session_expiry_interval_ == 0 && sei.value() != 0) { | ||
MQTT_LOG("mqtt_api", warning) | ||
<< MQTT_ADD_VALUE(address, this) | ||
<< "session_expiry_interval was 0 but at disconnect " | ||
<< sei.value() | ||
<< " is set."; | ||
} | ||
session_expiry_interval_ = sei.value(); | ||
} | ||
|
||
void set_session_expiry_timer() { | ||
if (base::get_protocol_version() != protocol_version::v5) return; | ||
if (session_expiry_interval_== 0xffffffff) return; | ||
if (session_expiry_interval_== 0) { | ||
base::clear_session_data(); | ||
return; | ||
} | ||
tim_session_expiry_.expires_after(std::chrono::seconds(session_expiry_interval_)); | ||
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this())); | ||
tim_session_expiry_.async_wait( | ||
[wp = force_move(wp)](error_code ec) { | ||
if (auto sp = wp.lock()) { | ||
if (!ec) { | ||
sp->clear_session_data(); | ||
} | ||
} | ||
} | ||
); | ||
|
||
} | ||
|
||
public: | ||
void cancel_session_expiry_timer() { | ||
tim_session_expiry_.cancel(); | ||
} | ||
|
||
protected: | ||
bool on_v5_connack(bool session_present, | ||
v5::connect_reason_code reason_code, | ||
v5::properties props) noexcept override { | ||
cancel_session_expiry_timer(); | ||
|
||
// The current implementation is simply | ||
// overwrite by broker's session expiry interval | ||
// | ||
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082 | ||
// broker could return session expiry interval with CONNACK | ||
// only when the client sent CONNECT withou session expiry interval. | ||
// The spec doesn't say this rule violation case. | ||
auto sei = get_session_expiry_interval_by_props(props); | ||
if (sei) { | ||
session_expiry_interval_ = sei.value(); | ||
} | ||
return true; | ||
} | ||
|
||
void on_close() noexcept override { | ||
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel(); | ||
set_session_expiry_timer(); | ||
} | ||
|
||
void on_error(error_code ec) noexcept override { | ||
(void)ec; | ||
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel(); | ||
set_session_expiry_timer(); | ||
} | ||
|
||
// Ensure that only code that knows the *exact* type of an object | ||
|
@@ -1434,6 +1523,8 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |
#if defined(MQTT_USE_WS) | ||
std::string path_; | ||
#endif // defined(MQTT_USE_WS) | ||
std::uint32_t session_expiry_interval_ = 0; | ||
as::steady_timer tim_session_expiry_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does client side need to have timer at all? I thought the timer was a server-side only thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first, I thought so too.
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901231
Consider the following scenario:
If waiting seconds< 10, the client should resend PUBLISH at the step2. The difference is caused by the client side session expiration process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. |
||
}; | ||
|
||
inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> { | |
public: | ||
using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint; | ||
protected: | ||
bool on_v5_connack(bool, v5::connect_reason_code, v5::properties) noexcept override { return true; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Necessary to add? I don't see a new pure-virtual function added to endpoint.hpp ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It is for the existing pure-vritual function at endpoint.hpp. (MQTT v5 CONNACK handler). However, it depends on the client side session expiry process. If the client side session expiry process is required, the client need to know the value of SessionExpiryInterval. In the following case, the value is notified by the broker. https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082
So the client needs to store the value. callable_overlay call the base class of on_v5_connack. It causes link error at the server. (Note: Both client and server use callable_overlay) It is the same pattern as https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-dfd5df761d5065103d919d255c1389b9R42 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Makes sense. |
||
void on_pre_send() noexcept override {} | ||
void on_close() noexcept override {} | ||
void on_error(error_code /*ec*/) noexcept override {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this causing problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I experienced build 6 timeout again and again.
When I removed parallel option, timeout disappeared.
I guess that the parallel build requires bigger memory and swapping is happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Understood.