Skip to content

Commit

Permalink
Merge pull request #869 from redboltz/add_broker_no_connack_test
Browse files Browse the repository at this point in the history
Added no CONNACK response broker emulation.
  • Loading branch information
redboltz authored Sep 25, 2021
2 parents 6d26414 + 6acae0f commit f41bf2b
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 40 deletions.
140 changes: 100 additions & 40 deletions include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,22 @@ class broker_t {
* @brief set pingresp send operaton
*
* @param b - if true, send pingresp when pingreq is received.
* if false, doesn't send pingrespp for test.
* if false, doesn't send pingresp for test.
*/
void set_pingresp(bool b) {
pingresp_ = b;
}

/**
* @brief set pingresp send operaton
*
* @param b - if true, send connack when connect is received.
* if false, doesn't send connack for test.
*/
void set_connack(bool b) {
connack_ = b;
}

// [end] for test setting

/**
Expand Down Expand Up @@ -381,7 +392,7 @@ class broker_t {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "send CONNACK reason_code:" << rc.value();
sp->async_connack(
if (connack_) sp->async_connack(
false,
rc.value(),
[sp]
Expand Down Expand Up @@ -1026,13 +1037,13 @@ class broker_t {
switch (ep.get_protocol_version()) {
case protocol_version::v3_1_1:
if (client_id.empty() && !clean_start) {
ep.connack(false, connect_return_code::identifier_rejected);
if (connack_) ep.async_connack(false, connect_return_code::identifier_rejected);
return false;
}
break;
case protocol_version::v5:
if (client_id.empty() && !clean_start) {
ep.connack(false, v5::connect_reason_code::client_identifier_not_valid);
if (connack_) ep.async_connack(false, v5::connect_reason_code::client_identifier_not_valid);
return false;
}
break;
Expand All @@ -1042,31 +1053,43 @@ class broker_t {
}

auto send_connack =
[&](bool session_present) {
[&](bool session_present, std::function<void(error_code)> finish = [](error_code){}) {
// Reply to the connect message.
switch (ep.get_protocol_version()) {
case protocol_version::v3_1_1:
ep.connack(
if (connack_) ep.async_connack(
session_present,
connect_return_code::accepted
connect_return_code::accepted,
[finish = force_move(finish)]
(error_code ec) {
finish(ec);
}
);
break;
case protocol_version::v5:
if (connack_props_.empty()) {
ep.connack(
if (connack_) ep.async_connack(
session_present,
v5::connect_reason_code::success,
v5::properties{
v5::property::topic_alias_maximum{topic_alias_max},
v5::property::receive_maximum{receive_maximum_max}
},
[finish = force_move(finish)]
(error_code ec) {
finish(ec);
}
);
}
else {
ep.connack(
if (connack_) ep.async_connack(
session_present,
v5::connect_reason_code::success,
connack_props_
connack_props_,
[finish = force_move(finish)]
(error_code ec) {
finish(ec);
}
);
}
break;
Expand Down Expand Up @@ -1110,7 +1133,6 @@ class broker_t {
force_move(will_expiry_interval),
force_move(session_expiry_interval)
);

send_connack(false);
}
else if (it->online()) {
Expand Down Expand Up @@ -1141,20 +1163,38 @@ class broker_t {
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, inherit old one and renew";
send_connack(true);
idx.modify(
it,
[&](auto& e) {
e.renew(spep, clean_start);
e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
// renew_session_expiry updates index
e.renew_session_expiry(force_move(session_expiry_interval));
e.send_inflight_messages();
e.send_all_offline_messages();
},
[](auto&) { BOOST_ASSERT(false); }
send_connack(
true,
[
this,
&idx,
it,
will = force_move(will),
clean_start,
spep,
will_expiry_interval,
session_expiry_interval
](error_code ec) mutable {
if (ec) {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< ec.message();
return;
}
idx.modify(
it,
[&](auto& e) {
e.renew(spep, clean_start);
e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
// renew_session_expiry updates index
e.renew_session_expiry(force_move(session_expiry_interval));
e.send_inflight_messages();
e.send_all_offline_messages();
},
[](auto&) { BOOST_ASSERT(false); }
);
}
);
// send offline messages
}
}
else {
Expand Down Expand Up @@ -1206,22 +1246,41 @@ class broker_t {
}
else {
// inherit offline session
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, inherit old one and renew";
send_connack(true);
idx.modify(
it,
[&](auto& e) {
e.renew(spep, clean_start);
e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
// renew_session_expiry updates index
e.renew_session_expiry(force_move(session_expiry_interval));
e.send_inflight_messages();
e.send_all_offline_messages();
},
[](auto&) { BOOST_ASSERT(false); }
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, inherit old one and renew";
send_connack(
true,
[
this,
&idx,
it,
will = force_move(will),
clean_start,
spep,
will_expiry_interval,
session_expiry_interval
](error_code ec) mutable {
if (ec) {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< ec.message();
return;
}
idx.modify(
it,
[&](auto& e) {
e.renew(spep, clean_start);
e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
// renew_session_expiry updates index
e.renew_session_expiry(force_move(session_expiry_interval));
e.send_inflight_messages();
e.send_all_offline_messages();
},
[](auto&) { BOOST_ASSERT(false); }
);
}
);
}
}
Expand Down Expand Up @@ -2092,6 +2151,7 @@ class broker_t {
std::function<void(v5::properties const&)> h_unsubscribe_props_;
std::function<void(v5::properties const&)> h_auth_props_;
bool pingresp_ = true;
bool connack_ = true;
};

MQTT_BROKER_NS_END
Expand Down
116 changes: 116 additions & 0 deletions test/system/st_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ BOOST_AUTO_TEST_CASE( async_connect_retry_before_cb ) {
auto& c = cs[0];
clear_ordered();
c->set_client_id("cid1");
c->set_clean_session(true);

checker chk = {
// connect
Expand Down Expand Up @@ -1411,6 +1412,121 @@ BOOST_AUTO_TEST_CASE( async_connect_retry_before_cb ) {
do_combi_test_async(test);
}

BOOST_AUTO_TEST_CASE( async_connect_retry_broker_no_connack ) {
auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& b) {
auto& c = cs[0];
clear_ordered();
c->set_client_id("cid1");
c->set_clean_session(true);
b.set_connack(false); // set broker no connack send mode for test
checker chk = {
cont("async_connect1"),
cont("async_connect1_timer_set"),
cont("h_async_connect1"), // underlying connected
// no CONNACK is sent by broker
deps("async_connect1_timer_fired", "async_connect1_timer_set"),
cont("async_force_disconnect"),
cont("h_async_force_disconnect"),

// broker recoverd as sending CONNACK
deps("async_connect2", "async_force_disconnect"),
cont("async_connect2_timer_set"),
cont("h_async_connect2"), // underlying connected

cont("h_connack2"),
// disconnect
cont("h_close2"),
deps("async_connect2_timer_aborted", "h_connack2"),
};

boost::asio::steady_timer tim(ioc);
switch (c->get_protocol_version()) {
case MQTT_NS::protocol_version::v3_1_1:
c->set_connack_handler(
[&]
(bool sp, MQTT_NS::connect_return_code connack_return_code) {
MQTT_CHK("h_connack2");
tim.cancel();
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted);
c->async_disconnect();
return true;
});
break;
case MQTT_NS::protocol_version::v5:
c->set_v5_connack_handler(
[&]
(bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_connack2");
tim.cancel();
BOOST_TEST(sp == false);
BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success);
c->async_disconnect();
return true;
});
break;
default:
BOOST_CHECK(false);
break;
}

c->set_close_handler(
[&]
() {
MQTT_CHK("h_close2");
finish();
});

c->set_error_handler(
[&]
(MQTT_NS::error_code) {
b.set_connack(true); // broker recovered for test
MQTT_CHK("async_connect2");
c->async_connect(
[&](MQTT_NS::error_code ec) {
MQTT_CHK("h_async_connect2");
BOOST_TEST(!ec);
}
);
MQTT_CHK("async_connect2_timer_set");
tim.expires_after(std::chrono::seconds(3));
tim.async_wait(
[&] (boost::system::error_code ec) {
BOOST_TEST(ec == boost::asio::error::operation_aborted);
MQTT_CHK("async_connect2_timer_aborted");
}
);
});

MQTT_CHK("async_connect1");
c->async_connect(
[&](MQTT_NS::error_code ec) {
MQTT_CHK("h_async_connect1");
BOOST_TEST(!ec);
}
);
tim.expires_after(std::chrono::seconds(3));
MQTT_CHK("async_connect1_timer_set");
tim.async_wait(
[&] (boost::system::error_code ec) {
BOOST_TEST(!ec);
MQTT_CHK("async_connect1_timer_fired");
MQTT_CHK("async_force_disconnect");
c->async_force_disconnect(
[&](MQTT_NS::error_code ec) {
MQTT_CHK("h_async_force_disconnect");
BOOST_TEST(!ec);
}
);
}
);

ioc.run();
BOOST_TEST(chk.all());
};
do_combi_test_async(test);
}

BOOST_AUTO_TEST_CASE( connect_prop ) {
auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& b) {
auto& c = cs[0];
Expand Down

0 comments on commit f41bf2b

Please sign in to comment.