Skip to content

Commit

Permalink
Merge pull request #874 from redboltz/fix_qos2_handle_logic
Browse files Browse the repository at this point in the history
Fixed broker side qos2 exactly once logic.
  • Loading branch information
redboltz authored Oct 2, 2021
2 parents 5e1d6b1 + da08be9 commit c24cb62
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 44 deletions.
20 changes: 0 additions & 20 deletions include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1483,10 +1483,6 @@ class broker_t {
);
break;
case qos::exactly_once: {
// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state&>(*it);
ss.exactly_once_start(packet_id.value());
ep.async_pubrec(
packet_id.value(),
v5::pubrec_reason_code::success,
Expand All @@ -1506,17 +1502,6 @@ class broker_t {
}
};

if (packet_id) {
if (pubopts.get_qos() == qos::exactly_once &&
it->exactly_once_processing(packet_id.value())) {
MQTT_LOG("mqtt_broker", info)
<< MQTT_ADD_VALUE(address, &ep)
<< "receive already processed publish pid:" << packet_id.value();
send_pubres();
return true;
}
}

v5::properties forward_props;

for (auto&& p : props) {
Expand Down Expand Up @@ -1655,11 +1640,6 @@ class broker_t {
// erased from sessions_
if (it == idx.end()) return true;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state&>(*it);
ss.exactly_once_finish(packet_id);

auto& ep = *spep;

switch (ep.get_protocol_version()) {
Expand Down
27 changes: 3 additions & 24 deletions include/mqtt/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ struct session_state {
);
}
);
qos2_publish_processed_ = con_->get_qos2_publish_handled_pids();
qos2_publish_handled_ = con_->get_qos2_publish_handled_pids();
con_.reset();

if (session_expiry_interval_ &&
Expand Down Expand Up @@ -293,30 +293,11 @@ struct session_state {
std::lock_guard<mutex> g(mtx_offline_messages_);
offline_messages_.clear();
}
{
std::lock_guard<mutex> g(mtx_qos2_publish_processed_);
qos2_publish_processed_.clear();
}
shared_targets_.erase(*this);
unsubscribe_all();
if (con_) con_->async_force_disconnect();
}

void exactly_once_start(packet_id_t packet_id) {
std::lock_guard<mutex> g(mtx_qos2_publish_processed_);
qos2_publish_processed_.insert(packet_id);
}

bool exactly_once_processing(packet_id_t packet_id) const {
std::shared_lock<mutex> g(mtx_qos2_publish_processed_);
return qos2_publish_processed_.find(packet_id) != qos2_publish_processed_.end();
}

void exactly_once_finish(packet_id_t packet_id) {
std::lock_guard<mutex> g(mtx_qos2_publish_processed_);
qos2_publish_processed_.erase(packet_id);
}

template <typename PublishRetainHandler>
void subscribe(
buffer share_name,
Expand Down Expand Up @@ -510,11 +491,12 @@ struct session_state {
if (clean_start) {
// send previous will
send_will_impl();
qos2_publish_handled_.clear();
}
else {
// cancel will
clear_will();
con->restore_qos2_publish_handled_pids(force_move(qos2_publish_processed_));
con->restore_qos2_publish_handled_pids(qos2_publish_handled_);
}
con_ = force_move(con);
}
Expand Down Expand Up @@ -584,9 +566,6 @@ struct session_state {
mutable mutex mtx_inflight_messages_;
inflight_messages inflight_messages_;

mutable mutex mtx_qos2_publish_processed_;
std::set<packet_id_t> qos2_publish_processed_;

mutable mutex mtx_offline_messages_;
offline_messages offline_messages_;

Expand Down

0 comments on commit c24cb62

Please sign in to comment.