Skip to content

Commit

Permalink
Fixed #856.
Browse files Browse the repository at this point in the history
Implemented clean shutdown.
  • Loading branch information
redboltz committed Sep 12, 2021
1 parent 11d26fa commit 015917f
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 59 deletions.
12 changes: 12 additions & 0 deletions include/mqtt/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,9 +1319,15 @@ class broker_t {
auto& ss = const_cast<session_state&>(*it);
do_send_will(ss);
if (rc) {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, spep.get())
<< "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
disconnect_and_force_disconnect(spep, rc.value());
}
else {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, spep.get())
<< "force_disconnect(async) cid:" << ss.client_id();
force_disconnect(spep);
}
idx.erase(it);
Expand All @@ -1334,9 +1340,15 @@ class broker_t {
[&](session_state& ss) {
do_send_will(ss);
if (rc) {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, spep.get())
<< "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
disconnect_and_force_disconnect(spep, rc.value());
}
else {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, spep.get())
<< "force_disconnect(async) cid:" << ss.client_id();
force_disconnect(spep);
}
// become_offline updates index
Expand Down
4 changes: 4 additions & 0 deletions include/mqtt/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ struct session_state {
}

void clean() {
MQTT_LOG("mqtt_broker", trace)
<< MQTT_ADD_VALUE(address, this)
<< "clean";
{
std::lock_guard<mutex> g(mtx_inflight_messages_);
inflight_messages_.clear();
Expand All @@ -296,6 +299,7 @@ struct session_state {
}
shared_targets_.erase(*this);
unsubscribe_all();
if (con_) con_->async_force_disconnect();
}

void exactly_once_start(packet_id_t packet_id) {
Expand Down
2 changes: 2 additions & 0 deletions include/mqtt/constant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define MQTT_CONSTANT_HPP

#include <cstddef>
#include <chrono>

#include <mqtt/namespace.hpp>
#include <mqtt/type.hpp>
Expand All @@ -21,6 +22,7 @@ static constexpr std::size_t packet_size_no_limit =
4 + // remaining length
128 * 128 * 128 * 128; // maximum value of remainin length
static constexpr receive_maximum_t receive_maximum_max = 0xffff;
static constexpr auto shutdown_timeout = std::chrono::seconds(3);

} // namespace MQTT_NS

Expand Down
148 changes: 100 additions & 48 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
endpoint(as::io_context& ioc, protocol_version version = protocol_version::undetermined, bool async_operation = false)
:async_operation_{async_operation},
version_(version),
tim_pingresp_(ioc)
tim_pingresp_(ioc),
tim_shutdown_(ioc)
{
MQTT_LOG("mqtt_api", info)
<< MQTT_ADD_VALUE(address, this)
Expand All @@ -218,7 +219,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
connected_(true),
async_operation_{async_operation},
version_(version),
tim_pingresp_(ioc)
tim_pingresp_(ioc),
tim_shutdown_(ioc)
{
MQTT_LOG("mqtt_api", info)
<< MQTT_ADD_VALUE(address, this)
Expand Down Expand Up @@ -949,7 +951,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
MQTT_LOG("mqtt_api", info)
<< MQTT_ADD_VALUE(address, this)
<< "start_session";
shutdowned_ = false;
shutdown_requested_ = false;
async_read_control_packet_type(force_move(session_life_keeper));
}

Expand Down Expand Up @@ -5043,24 +5045,22 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
bool handle_close_or_error(error_code ec) {
if (connected_) {
if (!ec) return false;
connected_ = false;
mqtt_connected_ = false;
{
boost::system::error_code ignored_ec;
socket_->close(ignored_ec);
}
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "handle_close_or_error call chutdown";
shutdown(socket());
}

connect_requested_ = false;
clean_sub_unsub_inflight();
if (disconnect_requested_) {
disconnect_requested_ = false;
connect_requested_ = false;
clean_sub_unsub_inflight();
on_close();
return true;
disconnect_requested_ = false;
}
else {
if (!ec) ec = boost::system::errc::make_error_code(boost::system::errc::not_connected);
on_error(ec);
}
disconnect_requested_ = false;
connect_requested_ = false;
if (!ec) ec = boost::system::errc::make_error_code(boost::system::errc::not_connected);
clean_sub_unsub_inflight_on_error(ec);
return true;
}

Expand Down Expand Up @@ -5260,36 +5260,82 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
clean_sub_unsub_inflight_on_error(boost::system::errc::make_error_code(boost::system::errc::protocol_error));
}

template <typename T>
void shutdown(T& socket) {
void shutdown(MQTT_NS::socket& s) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "shutdown";
if (shutdowned_) {
if (shutdown_requested_) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "already shutdowned";
return;
}
shutdowned_ = true;
connected_ = false;
shutdown_requested_ = true;
mqtt_connected_ = false;

{
boost::system::error_code ec;
socket.lowest_layer().shutdown(as::ip::tcp::socket::shutdown_both, ec);
if (async_operation_) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "socket shutdown ec:"
<< ec.message();
<< "async_clean_shutdown_and_close";
s.async_clean_shutdown_and_close(
[this, sp = this->shared_from_this(), ssp = socket_sp_ref()](error_code ec) { // *1
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_clean_shutdown_and_close ec:"
<< ec.message();
tim_shutdown_.cancel();
connected_ = false;
}
);
// timeout timer set
tim_shutdown_.expires_after(shutdown_timeout);
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
tim_shutdown_.async_wait(
[this, wp = force_move(wp), ssp = socket_sp_ref()](error_code ec) mutable {
if (auto sp = wp.lock()) {
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "async_shutdown timer ec:"
<< ec.message();
if (!ec) {
// timeout
// tcp_shutdown indirectly cancel stream.async_shutdown()
// and handler is called with error.
// So captured sp at *1 is released.

// post is for applying strand
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "post force_shutdown_and_close";
sp->socket().post(
[this, sp] {
if (connected_) {
error_code ec;
socket().force_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "force_shutdown_and_close ec:"
<< ec.message();
connected_ = false;
}
}
);
}
}
}
);
return;
}
{
boost::system::error_code ec;
socket.lowest_layer().close(ec);
else {
error_code ec;
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "clean_shutdown_and_close";
s.clean_shutdown_and_close(ec);
MQTT_LOG("mqtt_impl", trace)
<< MQTT_ADD_VALUE(address, this)
<< "socket close ec:"
<< "clean_shutdown_and_close ec:"
<< ec.message();
connected_ = false;
}
}

Expand Down Expand Up @@ -9483,7 +9529,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
std::uint16_t keep_alive_sec,
v5::properties props
) {
shutdowned_ = false;
shutdown_requested_ = false;
switch (version_) {
case protocol_version::v3_1_1:
do_sync_write(
Expand Down Expand Up @@ -10349,11 +10395,12 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
template <typename MessageVariant>
void do_sync_write(MessageVariant&& mv) {
boost::system::error_code ec;
if (!connected_) return;
on_pre_send();
total_bytes_sent_ += socket_->write(const_buffer_sequence<PacketIdBytes>(mv), ec);
// If ec is set as error, the error will be handled by async_read.
// If `handle_error(ec);` is called here, error_handler would be called twice.
if (can_send()) {
on_pre_send();
total_bytes_sent_ += socket_->write(const_buffer_sequence<PacketIdBytes>(mv), ec);
// If ec is set as error, the error will be handled by async_read.
// If `handle_error(ec);` is called here, error_handler would be called twice.
}
}

// Non blocking (async) senders
Expand All @@ -10366,7 +10413,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::properties props,
async_handler_t func
) {
shutdowned_ = false;
shutdown_requested_ = false;
switch (version_) {
case protocol_version::v3_1_1:
do_async_write(
Expand Down Expand Up @@ -11294,7 +11341,6 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
if (ec || // Error is handled by async_read.
!self_->connected_) {
self_->connected_ = false;
while (!self_->queue_.empty()) {
// Handlers for outgoing packets need not be valid.
if (auto&& h = self_->queue_.front().handler()) h(ec);
Expand All @@ -11316,7 +11362,6 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
if (ec || // Error is handled by async_read.
!self_->connected_) {
self_->connected_ = false;
while (!self_->queue_.empty()) {
// Handlers for outgoing packets need not be valid.
if(auto&& h = self_->queue_.front().handler()) h(ec);
Expand All @@ -11325,7 +11370,6 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
return;
}
if (bytes_to_transfer_ != bytes_transferred) {
self_->connected_ = false;
while (!self_->queue_.empty()) {
// Handlers for outgoing packets need not be valid.
if(auto&& h = self_->queue_.front().handler()) h(ec);
Expand Down Expand Up @@ -11407,15 +11451,17 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
socket_->post(
[this, self = this->shared_from_this(), mv = force_move(mv), func = force_move(func)]
() mutable {
if (!connected_) {
if (can_send()) {
queue_.emplace_back(force_move(mv), force_move(func));
// Only need to start async writes if there was nothing in the queue before the above item.
if (queue_.size() > 1) return;
do_async_write();
}
else {
// offline async publish is successfully finished, because there's nothing to do.
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
return;
}
queue_.emplace_back(force_move(mv), force_move(func));
// Only need to start async writes if there was nothing in the queue before the above item.
if (queue_.size() > 1) return;
do_async_write();
}
);
}
Expand Down Expand Up @@ -11503,6 +11549,10 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
publish_received_.erase(packet_id);
}

bool can_send() const {
return connected_ && ! shutdown_requested_;
}

static optional<topic_alias_t> get_topic_alias_from_prop(v5::property_variant const& prop) {
optional<topic_alias_t> val;
v5::visit_prop(
Expand Down Expand Up @@ -11548,7 +11598,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
std::shared_ptr<MQTT_NS::socket> socket_;
std::atomic<bool> connected_{false};
std::atomic<bool> mqtt_connected_{false};
std::atomic<bool> shutdowned_{false};
std::atomic<bool> shutdown_requested_{false};

std::array<char, 10> buf_;
std::uint8_t fixed_header_;
Expand Down Expand Up @@ -11583,6 +11633,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
as::steady_timer tim_pingresp_;
bool tim_pingresp_set_ = false;

as::steady_timer tim_shutdown_;

bool auto_map_topic_alias_send_ = false;
bool auto_replace_topic_alias_send_ = false;
mutable Mutex topic_alias_send_mtx_;
Expand Down
Loading

0 comments on commit 015917f

Please sign in to comment.