Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session expiry interval support #657

Merged
merged 6 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/gha.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -DBOOST_ASIO_NO_DEPRECATED -fsanitize=address
run: |
CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING"
cmake --build ${{ runner.temp }} --parallel $(nproc) --clean-first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this causing problems?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I experienced build 6 timeout again and again.
When I removed parallel option, timeout disappeared.
I guess that the parallel build requires bigger memory and swapping is happened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Understood.

cmake --build ${{ runner.temp }} --clean-first
- name: Compile without Sanitizer
if: (matrix.pattern == 0) || (matrix.pattern == 4) || (matrix.pattern == 5) || (matrix.pattern == 6) || (matrix.pattern == 7)
env:
Expand All @@ -88,7 +88,7 @@ jobs:
LDFLAGS: -Werror -g -Wall -Wextra -Wno-ignored-qualifiers -Wconversion -fno-omit-frame-pointer -DBOOST_ASIO_NO_DEPRECATED
run: |
CXXFLAGS="${CXXFLAGS} -pedantic -DBOOST_MULTI_INDEX_ENABLE_SAFE_MODE -DBOOST_MULTI_INDEX_DISABLE_SERIALIZATION -DBOOST_MULTI_INDEX_ENABLE_INVARIANT_CHECKING"
cmake --build ${{ runner.temp }} --parallel $(nproc) --clean-first
cmake --build ${{ runner.temp }} --clean-first
- name: Test
working-directory: ${{ runner.temp }}
run: |
Expand Down
1 change: 1 addition & 0 deletions include/mqtt/callable_overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ struct callable_overlay final : public Impl
MQTT_ALWAYS_INLINE bool on_v5_connack(bool session_present,
v5::connect_reason_code reason_code,
v5::properties props) noexcept override final {
if (!base::on_v5_connack(session_present, reason_code, props)) return false;
return ! h_v5_connack_
|| h_v5_connack_(session_present, reason_code, MQTT_NS::force_move(props));
}
Expand Down
91 changes: 91 additions & 0 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand All @@ -836,6 +837,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::disconnect(reason_code, force_move(props));
}
}
Expand Down Expand Up @@ -912,6 +914,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
}
);
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -954,6 +957,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_handler_t func = async_handler_t()) {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
if (base::connected()) {
set_session_expiry_interval_on_disconnect(props);
base::async_disconnect(reason_code, force_move(props), force_move(func));
}
}
Expand Down Expand Up @@ -1008,6 +1012,8 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
,
path_(force_move(path))
#endif // defined(MQTT_USE_WS)
,
tim_session_expiry_(ioc_)
{
#if defined(MQTT_USE_TLS)
ctx_.set_verify_mode(as::ssl::verify_peer);
Expand Down Expand Up @@ -1291,6 +1297,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(socket, force_move(props), force_move(session_life_keeper));
}

Expand All @@ -1308,6 +1315,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
handshake_socket(socket, force_move(props), force_move(session_life_keeper), ec);
}

Expand Down Expand Up @@ -1338,6 +1346,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
set_timer();
}
session_expiry_interval_ = get_session_expiry_interval_by_props(props).value_or(0);
async_handshake_socket(socket, force_move(props), force_move(session_life_keeper), force_move(func));
});
}
Expand Down Expand Up @@ -1378,14 +1387,94 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
set_timer();
}

static optional<std::uint32_t> get_session_expiry_interval_by_props(v5::properties const& props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that the standard explicitly says it's a 32 bit number, but I think we should make this a typedef. E.g "SessionExpiryInterval_t"

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented.

bool finish = false;
optional<std::uint32_t> val;
for (auto const& prop : props) {
MQTT_NS::visit(
make_lambda_visitor(
[&finish, &val](v5::property::session_expiry_interval const& p) {
val = p.val();
finish = true;
},
[](auto&&) {
}
), prop
);
if (finish) break;
}
return val;
}

void set_session_expiry_interval_on_disconnect(v5::properties const& props) {
auto sei = get_session_expiry_interval_by_props(props);
if (!sei) return;

if (session_expiry_interval_ == 0 && sei.value() != 0) {
MQTT_LOG("mqtt_api", warning)
<< MQTT_ADD_VALUE(address, this)
<< "session_expiry_interval was 0 but at disconnect "
<< sei.value()
<< " is set.";
}
session_expiry_interval_ = sei.value();
}

void set_session_expiry_timer() {
if (base::get_protocol_version() != protocol_version::v5) return;
if (session_expiry_interval_== 0xffffffff) return;
if (session_expiry_interval_== 0) {
base::clear_session_data();
return;
}
tim_session_expiry_.expires_after(std::chrono::seconds(session_expiry_interval_));
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_session_expiry_.async_wait(
[wp = force_move(wp)](error_code ec) {
if (auto sp = wp.lock()) {
if (!ec) {
sp->clear_session_data();
}
}
}
);

}

public:
void cancel_session_expiry_timer() {
tim_session_expiry_.cancel();
}

protected:
bool on_v5_connack(bool session_present,
v5::connect_reason_code reason_code,
v5::properties props) noexcept override {
cancel_session_expiry_timer();

// The current implementation is simply
// overwrite by broker's session expiry interval
//
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082
// broker could return session expiry interval with CONNACK
// only when the client sent CONNECT withou session expiry interval.
// The spec doesn't say this rule violation case.
auto sei = get_session_expiry_interval_by_props(props);
if (sei) {
session_expiry_interval_ = sei.value();
}
return true;
}

void on_close() noexcept override {
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

void on_error(error_code ec) noexcept override {
(void)ec;
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
set_session_expiry_timer();
}

// Ensure that only code that knows the *exact* type of an object
Expand Down Expand Up @@ -1434,6 +1523,8 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
#if defined(MQTT_USE_WS)
std::string path_;
#endif // defined(MQTT_USE_WS)
std::uint32_t session_expiry_interval_ = 0;
as::steady_timer tim_session_expiry_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does client side need to have timer at all?

I thought the timer was a server-side only thing?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first, I thought so too.
However, the sped said that
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048

The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901231

The Client and Server MUST NOT discard the Session State while the Network Connection is open [MQTT-4.1.0-1]. The Server MUST discard the Session State when the Network Connection is closed and the Session Expiry Interval has passed [MQTT-4.1.0-2].

Consider the following scenario:

  1. A client CONNECT (SessionExpiryInterval=10) to the broker. Note CleanStart is not important here.
  2. The client PUBLISH QoS1 message1 to topic1 PacketId=100
  3. Network is disconnected before receiving PUBACK PacketId=100.
  4. Wait some seconds.
  5. The client CONNECT (CleanStart=false) again to the broker. Note SessionExpiryInterval is not important here.

If waiting seconds< 10, the client should resend PUBLISH at the step2.
If waiting second >= 10, the client shouldn't resend PUBLISH at the step2.

The difference is caused by the client side session expiration process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

};

inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
Expand Down
11 changes: 7 additions & 4 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4567,8 +4567,13 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
version_ = version;
}

private:
void clear_session_data() {
LockGuard<Mutex> lck (store_mtx_);
store_.clear();
packet_id_.clear();
}

private:
bool check_transferred_length(
std::size_t bytes_transferred,
std::size_t bytes_expected) {
Expand Down Expand Up @@ -6841,9 +6846,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
|| ( (1 == variant_idx(info.reason_code))
&& (v5::connect_reason_code::success == variant_get<v5::connect_reason_code>(info.reason_code)))) {
if (clean_session_) {
LockGuard<Mutex> lck (store_mtx_);
store_.clear();
packet_id_.clear();
clear_session_data();
}
else {
if (async_send_store_) {
Expand Down
1 change: 1 addition & 0 deletions include/mqtt/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> {
public:
using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint;
protected:
bool on_v5_connack(bool, v5::connect_reason_code, v5::properties) noexcept override { return true; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary to add? I don't see a new pure-virtual function added to endpoint.hpp ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is for the existing pure-vritual function at endpoint.hpp. (MQTT v5 CONNACK handler).

However, it depends on the client side session expiry process.

If the client side session expiry process is required, the client need to know the value of SessionExpiryInterval.

In the following case, the value is notified by the broker.

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901082

If the Session Expiry Interval is absent the value in the CONNECT Packet used. The server uses this property to inform the Client that it is using a value other than that sent by the Client in the CONNACK. Refer to section 3.1.2.11.2 for a description of the use of Session Expiry Interval.

So the client needs to store the value.
See https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-3d4267aa87c422df49b85a5e6fe94072R1450

callable_overlay call the base class of on_v5_connack.
https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-456c816ee881b91e76edab876a15542eR359

It causes link error at the server. (Note: Both client and server use callable_overlay)

It is the same pattern as https://github.com/redboltz/mqtt_cpp/pull/657/files#diff-dfd5df761d5065103d919d255c1389b9R42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Makes sense.

void on_pre_send() noexcept override {}
void on_close() noexcept override {}
void on_error(error_code /*ec*/) noexcept override {}
Expand Down
4 changes: 4 additions & 0 deletions test/combi_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ inline void do_test(
iocb,
[&] {
s->close();
b.clear_all_sessions();
}
);
},
Expand Down Expand Up @@ -116,6 +117,7 @@ inline void do_tls_test(
iocb,
[&] {
s->close();
b.clear_all_sessions();
}
);
},
Expand Down Expand Up @@ -164,6 +166,7 @@ inline void do_ws_test(
iocb,
[&] {
s->close();
b.clear_all_sessions();
}
);
},
Expand Down Expand Up @@ -214,6 +217,7 @@ inline void do_tls_ws_test(
iocb,
[&] {
s->close();
b.clear_all_sessions();
}
);
},
Expand Down
87 changes: 72 additions & 15 deletions test/connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,9 @@ BOOST_AUTO_TEST_CASE( noclean ) {
break;
case 1:
MQTT_CHK("h_connack2");
BOOST_TEST(sp == true);
// The previous connection is not set Session Expiry Interval.
// That means session state is cleared on close.
BOOST_TEST(sp == false);
break;
case 2:
MQTT_CHK("h_connack3");
Expand Down Expand Up @@ -1193,9 +1195,7 @@ BOOST_AUTO_TEST_CASE( async_pingresp_timeout ) {
do_combi_test_async(test);
}



BOOST_AUTO_TEST_CASE( connect_disconnect_prop ) {
BOOST_AUTO_TEST_CASE( connect_prop ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& b) {
if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) {
finish();
Expand Down Expand Up @@ -1227,16 +1227,6 @@ BOOST_AUTO_TEST_CASE( connect_disconnect_prop ) {

std::size_t con_user_prop_count = 0;

MQTT_NS::v5::properties discon_ps {
MQTT_NS::v5::property::session_expiry_interval(0x12345678UL),
MQTT_NS::v5::property::reason_string("test reason string"_mb),
MQTT_NS::v5::property::user_property("key1"_mb, "val1"_mb),
MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb),
MQTT_NS::v5::property::server_reference("test server reference"_mb),
};

std::size_t discon_user_prop_count = 0;

b.set_connect_props_handler(
[&con_user_prop_count, size = con_ps.size()] (MQTT_NS::v5::properties const& props) {
BOOST_TEST(size == props.size());
Expand Down Expand Up @@ -1292,6 +1282,67 @@ BOOST_AUTO_TEST_CASE( connect_disconnect_prop ) {
}
);

c->set_v5_connack_handler(
[&chk, &c]
(bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_connack");
BOOST_TEST(c->connected() == true);
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success);

c->disconnect(MQTT_NS::v5::disconnect_reason_code::normal_disconnection, {});
BOOST_TEST(c->connected() == true);
return true;
});

c->set_close_handler(
[&chk, &finish, &c]
() {
MQTT_CHK("h_close");
BOOST_TEST(c->connected() == false);
c->cancel_session_expiry_timer();
finish();
});
c->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
});
c->connect(std::move(con_ps));
BOOST_TEST(c->connected() == false);
ioc.run();
BOOST_TEST(chk.all());
};
do_combi_test_sync(test);
}

BOOST_AUTO_TEST_CASE( disconnect_prop ) {
auto test = [](boost::asio::io_context& ioc, auto& c, auto finish, auto& b) {
if (c->get_protocol_version() != MQTT_NS::protocol_version::v5) {
finish();
return;
}
c->set_client_id("cid1");
c->set_clean_session(true);
BOOST_TEST(c->connected() == false);

checker chk = {
// connect
cont("h_connack"),
// disconnect
cont("h_close"),
};

MQTT_NS::v5::properties discon_ps {
MQTT_NS::v5::property::session_expiry_interval(0x12345678UL),
MQTT_NS::v5::property::reason_string("test reason string"_mb),
MQTT_NS::v5::property::user_property("key1"_mb, "val1"_mb),
MQTT_NS::v5::property::user_property("key2"_mb, "val2"_mb),
MQTT_NS::v5::property::server_reference("test server reference"_mb),
};

std::size_t discon_user_prop_count = 0;

b.set_disconnect_props_handler(
[&discon_user_prop_count, size = discon_ps.size()] (MQTT_NS::v5::properties const& props) {
BOOST_TEST(size == props.size());
Expand Down Expand Up @@ -1350,14 +1401,20 @@ BOOST_AUTO_TEST_CASE( connect_disconnect_prop ) {
() {
MQTT_CHK("h_close");
BOOST_TEST(c->connected() == false);
c->cancel_session_expiry_timer();
finish();
});
c->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
});
c->connect(std::move(con_ps));

c->connect(
MQTT_NS::v5::properties {
MQTT_NS::v5::property::session_expiry_interval(1) // to avoid protocol error
}
);
BOOST_TEST(c->connected() == false);
ioc.run();
BOOST_TEST(chk.all());
Expand Down
Loading