Skip to content

Commit

Permalink
Fixed assertion failed it != idx.end() at publish_hanler on the broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
redboltz committed Dec 9, 2020
1 parent 56e71af commit 1c31f90
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
29 changes: 28 additions & 1 deletion include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ class broker_t {
if (sp->connected()) {
sp->disconnect(v5::disconnect_reason_code::protocol_error);
}
else { // connecting
else if (sp->underlying_connected()){
// underlying layer connected, mqtt connecting
sp->connack(false, v5::connect_reason_code::protocol_error);
}
}
Expand Down Expand Up @@ -815,6 +816,10 @@ class broker_t {
auto it = idx.lower_bound(client_id);
if (it == idx.end() || it->client_id() != client_id) {
// new connection
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< " new connection inserted.";
it = idx.emplace_hint(
it,
ioc_,
Expand All @@ -835,6 +840,10 @@ class broker_t {
// remain offline
if (clean_start) {
// discard offline session
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, discard old one due to new one's clean_start and renew";
send_connack(false);
idx.modify(
it,
Expand All @@ -849,6 +858,10 @@ class broker_t {
}
else {
// inherit offline session
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, inherit old one and renew";
send_connack(true);
idx.modify(
it,
Expand All @@ -868,6 +881,10 @@ class broker_t {
}
else {
// new connection
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "online connection exists, discard old one due to session_expiry and renew";
bool inserted;
std::tie(it, inserted) = idx.emplace(
ioc_,
Expand All @@ -887,6 +904,10 @@ class broker_t {
// offline -> online
if (clean_start) {
// discard offline session
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, discard old one due to new one's clean_start and renew";
send_connack(false);
idx.modify(
it,
Expand All @@ -902,6 +923,10 @@ class broker_t {
}
else {
// inherit offline session
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "cid:" << client_id
<< "offline connection exists, inherit old one and renew";
send_connack(true);
idx.modify(
it,
Expand Down Expand Up @@ -1012,6 +1037,7 @@ class broker_t {
it,
[&](session_state& e) {
do_send_will(e);
e.con()->force_disconnect();
},
[](auto&) { BOOST_ASSERT(false); }
);
Expand All @@ -1024,6 +1050,7 @@ class broker_t {
it,
[&](session_state& e) {
do_send_will(e);
e.con()->force_disconnect();
e.become_offline(
[this]
(std::shared_ptr<as::steady_timer> const& sp_tim) {
Expand Down
1 change: 0 additions & 1 deletion include/mqtt/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ struct session_state {
// See
// https://lists.oasis-open.org/archives/mqtt-comment/202009/msg00000.html
topic_alias_recv_ = con_->get_topic_alias_recv_container();

reset_con();

if (session_expiry_interval_ &&
Expand Down
8 changes: 8 additions & 0 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4671,6 +4671,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
return connected_ && mqtt_connected_;
}

/**
* @brief Check underlying layer connection status
* @return current connection status
*/
bool underlying_connected() const {
return connected_;
}

/**
* @brief Trigger next mqtt message manually.
* If you call this function, you need to set manual receive mode
Expand Down

0 comments on commit 1c31f90

Please sign in to comment.