-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add session expiry interval support #657
Conversation
Broker needs a different approach.
Codecov Report
@@ Coverage Diff @@
## master #657 +/- ##
==========================================
+ Coverage 82.09% 82.14% +0.04%
==========================================
Files 45 45
Lines 6949 7006 +57
==========================================
+ Hits 5705 5755 +50
- Misses 1244 1251 +7 |
Memory consumption is more dominant.
b6535ba
to
5a2b3d7
Compare
@@ -79,7 +79,7 @@ jobs: | |||
LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -DBOOST_ASIO_NO_DEPRECATED -fsanitize=address | |||
run: | | |||
CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING" | |||
cmake --build ${{ runner.temp }} --parallel $(nproc) --clean-first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this causing problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I experienced build 6 timeout again and again.
When I removed parallel option, timeout disappeared.
I guess that the parallel build requires bigger memory and swapping is happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Understood.
test/test_broker.hpp
Outdated
@@ -997,11 +1017,26 @@ class test_broker { | |||
BOOST_ASSERT(non_active_sessions_.get<tag_client_id>().count(client_id) == 0); | |||
BOOST_ASSERT(non_active_sessions_.get<tag_client_id>().find(client_id) == non_active_sessions_.get<tag_client_id>().end()); | |||
|
|||
auto const& sei_opt = state->session_expiry_interval; | |||
if (sei_opt && sei_opt.value() != std::chrono::seconds(0xffffffffUL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::numeric_limits<std::chrono::seconds::rep>::max()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better
std::chrono::seconds::max()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that std::chrono::seconds can hold the value that is greater than 0xffffffff.
It is limited by MQTT v5 spec. 0xffffffff has the special meaning that never expire.
I thin that I should define static constexpr std::uint32_t const session_never_expire = 0xffffffffUL
somewhere.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Yes, this should be a static constexpr with documentation. I agree.
Yes, should not be ::max().
test/test_broker.hpp
Outdated
@@ -1107,18 +1143,26 @@ class test_broker { | |||
mi::ordered_unique< | |||
mi::tag<tag_client_id>, | |||
BOOST_MULTI_INDEX_MEMBER(session_state, MQTT_NS::buffer, client_id) | |||
>, | |||
mi::ordered_unique< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why you have converted the session_state to a shared pointer? Was the client id and connection shared-ptr not enough to uniquely identify a session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/redboltz/mqtt_cpp/pull/657/files#r476833703
You clear con
. So I cannot use connection shared_ptr for non_active_sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use clientid to identify non-active session, since only one session (active or non-active) can exist for any specific clientid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I considered using client_id.
- A client CONNECT (ClientId=cid1, SessionExpiryInterval=100) to the broker
- The broker added the client to active_sessions.
- The client disconnected.
- The broker removed the client from active_sessions and added to non_active_sessions.
- The broker set timeout (100sec). "cid1" is captured.
- The client CONNECT (ClientId=cid1, CleanStart=false, SessionExpiryInterval=0xffffffff) again.
- The broker removed the client from non_active_sessions and added the new one to active_sessions.
- Timer fired. Boost.Asio removed timer from timer management area and post io_context event. *1
- session_state object is destroyed here and timer is cancelled in the destructor. But no matched timer.
- The client disconnected.
- The broker removed the client from active_sessions and added to non_active_sessions.
- *1 event is processed and session_state is removed by the "cid1" even though the session has infinity lifetime.
Step 8 is controlled by Boost.Asio library. The timing is unpredictable. If Boost.Asio doesn't have internal thread, there is no interruption but I'm afraid that this assumption.
Step 8, 9 See https://stackoverflow.com/questions/43045192/how-to-avoid-firing-already-destroyed-boostasiodeadline-timer
I want to avoid unexpected match to new created session_state.
Using address is simpler way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the above scenario is actually happen. In the single thread environment it might never happen.
My point is dividing the issue.
Timer and weak_ptr approach works independently. Simply, unexpected match and/or accessing destructed object is never happen.
I'd like to separate timer issue from MQTT client_id unique constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are correct that in multithreaded environment, rapid connect, disconnect, connect, disconnect (similar to your example), can happen.
This would cause timer to potentially remove non-active-session that was for previous session.
Possible solutions:
- Have timer check address of timer object in non-active session_state object to be removed, and compare to address of timer.
Problem, potentially same address re-used by allocator.
- Some boost multi-index node ID of some sort?
Problem, may not exist, or if it exists, may be reused, so not unique.
-
Boost-multi-index intrusive pointer support? Either internal to boost-multi-index, or part of "session_state" struct.
-
Store boost-UUID in session-state object, copy uuid into timer, compare uuids before erasing.
-
This solution -- increased allocations, increased redirections caused by pointer access, instead of direct struct access.
-
Store mqtt_cpp::shared_scope_guard in session_state object, then store weak-ptr to shared_scope_guard in timer-lambda.
Benefits compared to 5)
Same number of allocations, but allocation of mqtt_cpp::shared_scope_guard is smaller than allocation of session_state.
Fewer pointer redirections. Quicker struct access.
Can improve test_broker code by moving session cleanup logic to the mqtt_cpp::shared_scope_guard.
- This solution + change use of boost-multi-index so that other multi-indexes take advantage of the shared_ptr<session_state>. For example, maybe can improved saved subscriptions storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comment. I will try 6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the code. I found good approach. It is similar to 6. session_state is no longer shared_ptr. Only steady_clock tim_session_expiry is shared_ptr. It is default constructed at first and only when timer is required, then it is initialized by shared_ptr (make_shared is used). Timer is automatically cancelled via session_state destructor (shared_ptr dispose).
session_state pointer redirection is the same as the original (master code).
@@ -1011,7 +1046,7 @@ class test_broker { | |||
auto& idx = subs_.get<tag_con>(); | |||
auto const& range = boost::make_iterator_range(idx.equal_range(spep)); | |||
// In v3_1_1, session_expiry_interval is not set. So clean on close. | |||
if (ep.clean_session() && session_clear) { | |||
if (session_clear) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this cause problems for 3_1_1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is no problem.
Because 3_1_1 is considered at
https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-45ca9626e6c7c611be2ceb2ff63031a5R976
and reflected to the value session_clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
else { | ||
BOOST_ASSERT(ep.get_protocol_version() == MQTT_NS::protocol_version::v5); | ||
auto const& sei_opt = (*act_sess_it)->session_expiry_interval; | ||
return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
std::chrono::seconds::zero()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right! I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that there are many place using std::chrono::steady_clock::duration::zero()
.
For example
mqtt_cpp/include/mqtt/client.hpp
Line 369 in 5ae194f
if ((ping_duration_ != std::chrono::steady_clock::duration::zero()) && base::connected() && (ping == std::chrono::steady_clock::duration::zero())) { |
I'm not 100% sure but it is introduced by you. (not checked log...)
The type of sessions::session_expiry_interval
is MQTT_NS::optional<std::chrono::steady_clock::duration> session_expiry_interval
.
I think that std::chrono::steady_clock::duration::zero()
is more natural choice.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would have been me, yes.
Hrmm.
I do not know which is best.
See https://en.cppreference.com/w/cpp/chrono/steady_clock
steady_clock::duration::zero() is not necessarily seconds::zero().
Maybe better to use steady_clock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so.
I keep std::chrono::steady_clock::duration::zero()
code.
test/test_broker.hpp
Outdated
if (sei_opt && sei_opt.value() != std::chrono::seconds(0xffffffffUL)) { | ||
state->tim_session_expiry.expires_after(sei_opt.value()); | ||
state->tim_session_expiry.async_wait( | ||
[&, wp = session_state_wp(state)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be shared_ptr / weak_ptr.
The lambda can store the connection shared pointer, that's sufficient to look up the active session state object,
and client id is sufficient to look up either the active session object, or the inactive session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You clear session_state::con at https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-45ca9626e6c7c611be2ceb2ff63031a5L988 and move it to non_active_sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure removing clearing con
code is good. Also I don't know why you add clearing con
code.
I guess that some of resources are released by the endpoint destructor and it should be happened before insert non_active_sessions.
So I didn't modify the clearing code.
@@ -38,6 +38,7 @@ class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> { | |||
public: | |||
using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint; | |||
protected: | |||
bool on_v5_connack(bool, v5::connect_reason_code, v5::properties) noexcept override { return true; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Necessary to add? I don't see a new pure-virtual function added to endpoint.hpp ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is for the existing pure-vritual function at endpoint.hpp. (MQTT v5 CONNACK handler).
However, it depends on the client side session expiry process.
If the client side session expiry process is required, the client need to know the value of SessionExpiryInterval.
In the following case, the value is notified by the broker.
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082
If the Session Expiry Interval is absent the value in the CONNECT Packet used. The server uses this property to inform the Client that it is using a value other than that sent by the Client in the CONNACK. Refer to section 3.1.2.11.2 for a description of the use of Session Expiry Interval.
So the client needs to store the value.
See https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-3d4267aa87c422df49b85a5e6fe94072R1450
callable_overlay call the base class of on_v5_connack.
https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-456c816ee881b91e76edab876a15542eR359
It causes link error at the server. (Note: Both client and server use callable_overlay)
It is the same pattern as https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-dfd5df761d5065103d919d255c1389b9R42
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Makes sense.
@@ -1434,6 +1523,8 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |||
#if defined(MQTT_USE_WS) | |||
std::string path_; | |||
#endif // defined(MQTT_USE_WS) | |||
std::uint32_t session_expiry_interval_ = 0; | |||
as::steady_timer tim_session_expiry_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does client side need to have timer at all?
I thought the timer was a server-side only thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first, I thought so too.
However, the sped said that
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048
The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901231
The Client and Server MUST NOT discard the Session State while the Network Connection is open [MQTT-4.1.0-1]. The Server MUST discard the Session State when the Network Connection is closed and the Session Expiry Interval has passed [MQTT-4.1.0-2].
Consider the following scenario:
- A client CONNECT (SessionExpiryInterval=10) to the broker. Note CleanStart is not important here.
- The client PUBLISH QoS1 message1 to topic1 PacketId=100
- Network is disconnected before receiving PUBACK PacketId=100.
- Wait some seconds.
- The client CONNECT (CleanStart=false) again to the broker. Note SessionExpiryInterval is not important here.
If waiting seconds< 10, the client should resend PUBLISH at the step2.
If waiting second >= 10, the client shouldn't resend PUBLISH at the step2.
The difference is caused by the client side session expiration process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Only steady_timer is shared_ptr.
I think that all issues you pointed out has been solved so the PR is ready to merge. |
include/mqtt/client.hpp
Outdated
@@ -1378,14 +1388,94 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> { | |||
set_timer(); | |||
} | |||
|
|||
static optional<std::uint32_t> get_session_expiry_interval_by_props(v5::properties const& props) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that the standard explicitly says it's a 32 bit number, but I think we should make this a typedef. E.g "SessionExpiryInterval_t"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented.
Except for suggestion for making typedef for session expiry interval, looks good to merge. |
No description provided.