Skip to content

Commit

Permalink
MON-17390 less lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-christophe81 committed Mar 17, 2023
1 parent ed08c95 commit c1031fc
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 68 deletions.
5 changes: 4 additions & 1 deletion broker/core/inc/com/centreon/broker/mysql_connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class mysql_connection {
/**************************************************************************/
bool _server_error(int code) const;
void _run();
void _process_tasks(std::list<std::unique_ptr<database::mysql_task>>& task);
void _process_while_empty_task(
std::list<std::unique_ptr<database::mysql_task>>& task);

std::string _get_stack();
void _query(database::mysql_task* t);
void _query_res(database::mysql_task* t);
Expand All @@ -124,7 +128,6 @@ class mysql_connection {
void _fetch_row_sync(database::mysql_task* task);
void _get_version(database::mysql_task* t);
void _push(std::unique_ptr<database::mysql_task>&& q);
void _debug(MYSQL_BIND* bind, uint32_t size);
bool _try_to_reconnect();

static void (mysql_connection::*const _task_processing_table[])(
Expand Down
161 changes: 94 additions & 67 deletions broker/core/src/mysql_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ bool mysql_connection::_try_to_reconnect() {
if (!_conn) {
SPDLOG_LOGGER_ERROR(log_v2::sql(),
"mysql_connection: reconnection failed.");
set_error_message("mysql_connection: reconnection failed.");
return false;
}

Expand All @@ -158,6 +159,8 @@ bool mysql_connection::_try_to_reconnect() {
SPDLOG_LOGGER_ERROR(
log_v2::sql(),
"mysql_connection: The mysql/mariadb database seems not started.");
set_error_message(
"mysql_connection: The mysql/mariadb database seems not started.");
return false;
}
_last_access = std::time(nullptr);
Expand Down Expand Up @@ -300,7 +303,6 @@ void mysql_connection::_commit(mysql_task* t) {
std::string err_msg(
fmt::format("Error during commit: {}", ::mysql_error(_conn)));
SPDLOG_LOGGER_ERROR(log_v2::sql(), "mysql_connection: {}", err_msg);
set_error_message(err_msg);
} else {
/* No more queries are waiting for a commit now. */
_need_commit = false;
Expand Down Expand Up @@ -755,98 +757,123 @@ void mysql_connection::_run() {
std::chrono::system_clock::time_point last_commit =
std::chrono::system_clock::now();

std::unique_lock<std::mutex> lock(_tasks_m);
bool reconnect_failed_logged = false;
std::list<std::unique_ptr<database::mysql_task>> tasks_list;
while (_state == running || !_tasks_list.empty()) {
if (tasks_list.empty()) {
std::unique_lock<std::mutex> lock(_tasks_m);
_tasks_list.swap(tasks_list);
}

if (_error.is_active()) {
lock.unlock();
if (!_try_to_reconnect()) {
if (!reconnect_failed_logged) {
SPDLOG_LOGGER_ERROR(log_v2::sql(), "SQL: Reconnection failed.");
reconnect_failed_logged = true;
}
std::this_thread::sleep_for(std::chrono::seconds(10));
lock.lock();
continue;
} else {
reconnect_failed_logged = false;
}
lock.lock();
}
mysql_task* to_execute;
if (!_tasks_list.empty()) {
to_execute = _tasks_list.begin()->get();
} else {
/* We are waiting for some activity, nothing to do for now it is time
* to make some ping */
while (!_tasks_condition.wait_for(
lock, std::chrono::seconds(5),
[this] { return _finish_asked || !_tasks_list.empty(); })) {
if (time(nullptr) > _last_commit + _max_second_commit_delay) {
_commit(nullptr);
}
_update_stats();
if (!tasks_list.empty()) {
_process_tasks(tasks_list);
} else {
_process_while_empty_task(tasks_list);
}
}
}
}
_clear_connection();
mysql_thread_end();
log_v2::core()->trace("mysql connection main loop finished.");
}

std::time_t now = std::time(nullptr);
/**
* @brief when we excecute thsi method, we are connected and there are tasks to
* execute
*
* @param tasks_list in out list that will be empty by task execution
*/
void mysql_connection::_process_tasks(
std::list<std::unique_ptr<database::mysql_task>>& tasks_list) {
while (!tasks_list.empty()) {
--_tasks_count;
_update_stats();
database::mysql_task* task = tasks_list.begin()->get();

if (_tasks_list.empty())
_state = finished;
else if (now >= _last_access + 30) {
lock.unlock();
SPDLOG_LOGGER_TRACE(
log_v2::sql(),
"mysql_connection {:p} SQL: performing mysql_ping.",
static_cast<const void*>(this));
if (mysql_ping(_conn)) {
if (!_try_to_reconnect())
SPDLOG_LOGGER_ERROR(
log_v2::sql(),
"mysql_connection {:p} SQL: Reconnection failed.",
static_cast<const void*>(this));
} else {
SPDLOG_LOGGER_TRACE(
log_v2::sql(),
"mysql_connection {:p} SQL: connection always alive",
static_cast<const void*>(this));
_last_access = now;
}
lock.lock();
} else
SPDLOG_LOGGER_TRACE(log_v2::sql(),
"mysql_connection {:p} SQL: last access to the "
"database for this connection for {}s",
static_cast<const void*>(this),
now - _last_access);
continue;
if (task->type <
sizeof(_task_processing_table) / sizeof(_task_processing_table[0])) {
(this->*(_task_processing_table[task->type]))(task);
if (time(nullptr) > _last_commit + _max_second_commit_delay) {
_commit(nullptr);
}
} else {
SPDLOG_LOGGER_ERROR(log_v2::sql(),
"mysql_connection {:p}: Error type not managed...",
static_cast<const void*>(this));
}

lock.unlock();
if (!_error.is_active()) {
tasks_list.pop_front();
_last_access = time(nullptr);
} else {
return;
}
}
}

--_tasks_count;
_update_stats();
/**
* @brief wait for a task to execute and perform a mysql ping after 30s of
* inactivity to ensure connection is still alive
*
* @param tasks_list
*/
void mysql_connection::_process_while_empty_task(
std::list<std::unique_ptr<database::mysql_task>>& tasks_list) {
std::unique_lock<std::mutex> lock(_tasks_m);
/* We are waiting for some activity, nothing to do for now it is time
* to make some ping */
while (!_tasks_condition.wait_for(lock, std::chrono::seconds(5), [this] {
return _finish_asked || !_tasks_list.empty();
})) {
if (time(nullptr) > _last_commit + _max_second_commit_delay) {
_commit(nullptr);
}
_update_stats();
}

std::time_t now = std::time(nullptr);

if (to_execute->type <
sizeof(_task_processing_table) / sizeof(_task_processing_table[0])) {
(this->*(_task_processing_table[to_execute->type]))(to_execute);
if (time(nullptr) > _last_commit + _max_second_commit_delay) {
_commit(nullptr);
if (_tasks_list.empty()) {
_state = finished;
} else {
tasks_list.swap(_tasks_list);
lock.unlock();
if (now >= _last_access + 30) {
SPDLOG_LOGGER_TRACE(log_v2::sql(),
"mysql_connection {:p} SQL: performing mysql_ping.",
static_cast<const void*>(this));
if (mysql_ping(_conn)) {
if (!_try_to_reconnect()) {
SPDLOG_LOGGER_ERROR(log_v2::sql(),
"mysql_connection {:p} SQL: Reconnection failed.",
static_cast<const void*>(this));
}
} else {
SPDLOG_LOGGER_ERROR(log_v2::sql(),
"mysql_connection {:p}: Error type not managed...",
static_cast<const void*>(this));
}

lock.lock();
if (!_error.is_active()) {
_tasks_list.pop_front();
SPDLOG_LOGGER_TRACE(
log_v2::sql(), "mysql_connection {:p} SQL: connection always alive",
static_cast<const void*>(this));
_last_access = now;
}
} else {
SPDLOG_LOGGER_TRACE(log_v2::sql(),
"mysql_connection {:p} SQL: last access to the "
"database for this connection for {}s",
static_cast<const void*>(this), now - _last_access);
}
}
_clear_connection();
mysql_thread_end();
log_v2::core()->trace("mysql connection main loop finished.");
}

/******************************************************************************/
Expand Down

0 comments on commit c1031fc

Please sign in to comment.