diff --git a/include/mqtt/broker/broker.hpp b/include/mqtt/broker/broker.hpp index 0c4f61a21..ec4702b4b 100644 --- a/include/mqtt/broker/broker.hpp +++ b/include/mqtt/broker/broker.hpp @@ -279,11 +279,22 @@ class broker_t { * @brief set pingresp send operaton * * @param b - if true, send pingresp when pingreq is received. - * if false, doesn't send pingrespp for test. + * if false, doesn't send pingresp for test. */ void set_pingresp(bool b) { pingresp_ = b; } + + /** + * @brief set pingresp send operaton + * + * @param b - if true, send connack when connect is received. + * if false, doesn't send connack for test. + */ + void set_connack(bool b) { + connack_ = b; + } + // [end] for test setting /** @@ -381,7 +392,7 @@ class broker_t { MQTT_LOG("mqtt_broker", trace) << MQTT_ADD_VALUE(address, this) << "send CONNACK reason_code:" << rc.value(); - sp->async_connack( + if (connack_) sp->async_connack( false, rc.value(), [sp] @@ -1026,13 +1037,13 @@ class broker_t { switch (ep.get_protocol_version()) { case protocol_version::v3_1_1: if (client_id.empty() && !clean_start) { - ep.connack(false, connect_return_code::identifier_rejected); + if (connack_) ep.async_connack(false, connect_return_code::identifier_rejected); return false; } break; case protocol_version::v5: if (client_id.empty() && !clean_start) { - ep.connack(false, v5::connect_reason_code::client_identifier_not_valid); + if (connack_) ep.async_connack(false, v5::connect_reason_code::client_identifier_not_valid); return false; } break; @@ -1042,31 +1053,43 @@ class broker_t { } auto send_connack = - [&](bool session_present) { + [&](bool session_present, std::function finish = [](error_code){}) { // Reply to the connect message. switch (ep.get_protocol_version()) { case protocol_version::v3_1_1: - ep.connack( + if (connack_) ep.async_connack( session_present, - connect_return_code::accepted + connect_return_code::accepted, + [finish = force_move(finish)] + (error_code ec) { + finish(ec); + } ); break; case protocol_version::v5: if (connack_props_.empty()) { - ep.connack( + if (connack_) ep.async_connack( session_present, v5::connect_reason_code::success, v5::properties{ v5::property::topic_alias_maximum{topic_alias_max}, v5::property::receive_maximum{receive_maximum_max} + }, + [finish = force_move(finish)] + (error_code ec) { + finish(ec); } ); } else { - ep.connack( + if (connack_) ep.async_connack( session_present, v5::connect_reason_code::success, - connack_props_ + connack_props_, + [finish = force_move(finish)] + (error_code ec) { + finish(ec); + } ); } break; @@ -1110,7 +1133,6 @@ class broker_t { force_move(will_expiry_interval), force_move(session_expiry_interval) ); - send_connack(false); } else if (it->online()) { @@ -1141,20 +1163,38 @@ class broker_t { << MQTT_ADD_VALUE(address, this) << "cid:" << client_id << "online connection exists, inherit old one and renew"; - send_connack(true); - idx.modify( - it, - [&](auto& e) { - e.renew(spep, clean_start); - e.update_will(timer_ioc_, force_move(will), will_expiry_interval); - // renew_session_expiry updates index - e.renew_session_expiry(force_move(session_expiry_interval)); - e.send_inflight_messages(); - e.send_all_offline_messages(); - }, - [](auto&) { BOOST_ASSERT(false); } + send_connack( + true, + [ + this, + &idx, + it, + will = force_move(will), + clean_start, + spep, + will_expiry_interval, + session_expiry_interval + ](error_code ec) mutable { + if (ec) { + MQTT_LOG("mqtt_broker", trace) + << MQTT_ADD_VALUE(address, this) + << ec.message(); + return; + } + idx.modify( + it, + [&](auto& e) { + e.renew(spep, clean_start); + e.update_will(timer_ioc_, force_move(will), will_expiry_interval); + // renew_session_expiry updates index + e.renew_session_expiry(force_move(session_expiry_interval)); + e.send_inflight_messages(); + e.send_all_offline_messages(); + }, + [](auto&) { BOOST_ASSERT(false); } + ); + } ); - // send offline messages } } else { @@ -1206,22 +1246,41 @@ class broker_t { } else { // inherit offline session - MQTT_LOG("mqtt_broker", trace) - << MQTT_ADD_VALUE(address, this) - << "cid:" << client_id - << "offline connection exists, inherit old one and renew"; - send_connack(true); - idx.modify( - it, - [&](auto& e) { - e.renew(spep, clean_start); - e.update_will(timer_ioc_, force_move(will), will_expiry_interval); - // renew_session_expiry updates index - e.renew_session_expiry(force_move(session_expiry_interval)); - e.send_inflight_messages(); - e.send_all_offline_messages(); - }, - [](auto&) { BOOST_ASSERT(false); } + MQTT_LOG("mqtt_broker", trace) + << MQTT_ADD_VALUE(address, this) + << "cid:" << client_id + << "offline connection exists, inherit old one and renew"; + send_connack( + true, + [ + this, + &idx, + it, + will = force_move(will), + clean_start, + spep, + will_expiry_interval, + session_expiry_interval + ](error_code ec) mutable { + if (ec) { + MQTT_LOG("mqtt_broker", trace) + << MQTT_ADD_VALUE(address, this) + << ec.message(); + return; + } + idx.modify( + it, + [&](auto& e) { + e.renew(spep, clean_start); + e.update_will(timer_ioc_, force_move(will), will_expiry_interval); + // renew_session_expiry updates index + e.renew_session_expiry(force_move(session_expiry_interval)); + e.send_inflight_messages(); + e.send_all_offline_messages(); + }, + [](auto&) { BOOST_ASSERT(false); } + ); + } ); } } @@ -2092,6 +2151,7 @@ class broker_t { std::function h_unsubscribe_props_; std::function h_auth_props_; bool pingresp_ = true; + bool connack_ = true; }; MQTT_BROKER_NS_END diff --git a/test/system/st_connect.cpp b/test/system/st_connect.cpp index ffeaeb713..287882963 100644 --- a/test/system/st_connect.cpp +++ b/test/system/st_connect.cpp @@ -1330,6 +1330,7 @@ BOOST_AUTO_TEST_CASE( async_connect_retry_before_cb ) { auto& c = cs[0]; clear_ordered(); c->set_client_id("cid1"); + c->set_clean_session(true); checker chk = { // connect @@ -1411,6 +1412,121 @@ BOOST_AUTO_TEST_CASE( async_connect_retry_before_cb ) { do_combi_test_async(test); } +BOOST_AUTO_TEST_CASE( async_connect_retry_broker_no_connack ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& b) { + auto& c = cs[0]; + clear_ordered(); + c->set_client_id("cid1"); + c->set_clean_session(true); + b.set_connack(false); // set broker no connack send mode for test + checker chk = { + cont("async_connect1"), + cont("async_connect1_timer_set"), + cont("h_async_connect1"), // underlying connected + // no CONNACK is sent by broker + deps("async_connect1_timer_fired", "async_connect1_timer_set"), + cont("async_force_disconnect"), + cont("h_async_force_disconnect"), + + // broker recoverd as sending CONNACK + deps("async_connect2", "async_force_disconnect"), + cont("async_connect2_timer_set"), + cont("h_async_connect2"), // underlying connected + + cont("h_connack2"), + // disconnect + cont("h_close2"), + deps("async_connect2_timer_aborted", "h_connack2"), + }; + + boost::asio::steady_timer tim(ioc); + switch (c->get_protocol_version()) { + case MQTT_NS::protocol_version::v3_1_1: + c->set_connack_handler( + [&] + (bool sp, MQTT_NS::connect_return_code connack_return_code) { + MQTT_CHK("h_connack2"); + tim.cancel(); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == MQTT_NS::connect_return_code::accepted); + c->async_disconnect(); + return true; + }); + break; + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + [&] + (bool sp, MQTT_NS::v5::connect_reason_code connect_reason_code, MQTT_NS::v5::properties /*props*/) { + MQTT_CHK("h_connack2"); + tim.cancel(); + BOOST_TEST(sp == false); + BOOST_TEST(connect_reason_code == MQTT_NS::v5::connect_reason_code::success); + c->async_disconnect(); + return true; + }); + break; + default: + BOOST_CHECK(false); + break; + } + + c->set_close_handler( + [&] + () { + MQTT_CHK("h_close2"); + finish(); + }); + + c->set_error_handler( + [&] + (MQTT_NS::error_code) { + b.set_connack(true); // broker recovered for test + MQTT_CHK("async_connect2"); + c->async_connect( + [&](MQTT_NS::error_code ec) { + MQTT_CHK("h_async_connect2"); + BOOST_TEST(!ec); + } + ); + MQTT_CHK("async_connect2_timer_set"); + tim.expires_after(std::chrono::seconds(3)); + tim.async_wait( + [&] (boost::system::error_code ec) { + BOOST_TEST(ec == boost::asio::error::operation_aborted); + MQTT_CHK("async_connect2_timer_aborted"); + } + ); + }); + + MQTT_CHK("async_connect1"); + c->async_connect( + [&](MQTT_NS::error_code ec) { + MQTT_CHK("h_async_connect1"); + BOOST_TEST(!ec); + } + ); + tim.expires_after(std::chrono::seconds(3)); + MQTT_CHK("async_connect1_timer_set"); + tim.async_wait( + [&] (boost::system::error_code ec) { + BOOST_TEST(!ec); + MQTT_CHK("async_connect1_timer_fired"); + MQTT_CHK("async_force_disconnect"); + c->async_force_disconnect( + [&](MQTT_NS::error_code ec) { + MQTT_CHK("h_async_force_disconnect"); + BOOST_TEST(!ec); + } + ); + } + ); + + ioc.run(); + BOOST_TEST(chk.all()); + }; + do_combi_test_async(test); +} + BOOST_AUTO_TEST_CASE( connect_prop ) { auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& b) { auto& c = cs[0];