Skip to content

Commit

Permalink
in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Maaown committed Dec 14, 2021
1 parent da6179e commit 9bc1623
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 137 deletions.
16 changes: 10 additions & 6 deletions centreon-broker/core/inc/com/centreon/broker/broker_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ class broker_impl final : public Broker::Service {
GenericString* response) override;

grpc::Status GetSqlConnectionStats(grpc::ServerContext* context,
const GenericInt* request,
SqlConnectionStats* response) override;
const GenericInt* request,
SqlConnectionStats* response) override;
grpc::Status GetAllSqlConnectionsStats(
grpc::ServerContext* context,
const ::google::protobuf::Empty* /*request*/,
AllSqlConnectionsStats* response) override;
grpc::Status GetConflictManagerStats(grpc::ServerContext* context,
const ::google::protobuf::Empty* request,
ConflictManagerStats* response) override;
const ::google::protobuf::Empty* request,
ConflictManagerStats* response) override;
grpc::Status GetSqlConnectionSize(grpc::ServerContext* context,
const ::google::protobuf::Empty* request,
GenericSize* response) override;
const ::google::protobuf::Empty* request,
GenericSize* response) override;
grpc::Status GetNumModules(grpc::ServerContext* context,
const ::google::protobuf::Empty* /*request*/,
GenericSize* response) override;
Expand Down
16 changes: 12 additions & 4 deletions centreon-broker/core/inc/com/centreon/broker/mysql_connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ class mysql_connection {
std::atomic_int _tasks_count;
bool _need_commit;

bool _isConnected;
std::chrono::time_point<std::chrono::steady_clock> _startPoint;

std::unordered_map<uint32_t, MYSQL_STMT*> _stmt;
std::unordered_map<uint32_t, std::string> _stmt_query;

Expand All @@ -95,6 +92,14 @@ class mysql_connection {
int _port;
std::atomic<connection_state> _state;

// Intermediate variables used to fill _stats
// _isConnected indicate the state of the connection
// => false : not connected
// => true : connected
// _switchPoint holds the timestamp of the last time _isConnected switched
bool _is_connected;
std::time_t _switch_point;

SqlConnectionStats* _stats;
std::time_t _clk;
uint32_t _qps;
Expand Down Expand Up @@ -129,7 +134,7 @@ class mysql_connection {
void _prepare_connection();
void _clear_connection();

void updateStats(void) noexcept;
void update_stats(void) noexcept;

public:
/**************************************************************************/
Expand Down Expand Up @@ -174,6 +179,9 @@ class mysql_connection {
void clear_error();
std::string get_error_message();

bool is_connected(void) const noexcept;
std::time_t get_switch_point(void) const noexcept;

/**
* @brief Create an error on the connection. All error created as this, is a
* fatal error that will throw an exception later.
Expand Down
3 changes: 2 additions & 1 deletion centreon-broker/core/inc/com/centreon/broker/stats/center.hh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class center {
SqlConnectionStats* register_mysql_connection();
ConflictManagerStats* register_conflict_manager();
bool unregister_mysql_connection(SqlConnectionStats* connection);
void get_sql_connection_stats(uint32_t index, SqlConnectionStats* response);
bool get_sql_connection_stats(uint32_t index, SqlConnectionStats* response);
void get_all_sql_connections_stats(AllSqlConnectionsStats *response);
void get_conflict_manager_stats(ConflictManagerStats* response);
// bool unregister_endpoint(const std::string& name);
// bool unregister_feeder(EndpointStats* ep_stats, const std::string& name);
Expand Down
11 changes: 9 additions & 2 deletions centreon-broker/core/src/broker.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/duration.proto";

package com.centreon.broker;

Expand All @@ -11,6 +10,7 @@ service Broker {
// stats
rpc GetGenericStats(google.protobuf.Empty) returns (GenericString) {}
rpc GetSqlConnectionStats(GenericInt) returns (SqlConnectionStats) {}
rpc GetAllSqlConnectionsStats(google.protobuf.Empty) returns (AllSqlConnectionsStats) {}
rpc GetSqlConnectionSize(google.protobuf.Empty) returns (GenericSize) {}
rpc GetConflictManagerStats(google.protobuf.Empty) returns (ConflictManagerStats) {}

Expand Down Expand Up @@ -77,7 +77,14 @@ message EngineStats {
message SqlConnectionStats {
int32 waiting_tasks = 1;
bool is_connected = 2;
google.protobuf.Duration uptime = 3;
oneof since {
uint64 up_since = 3;
uint64 down_since = 4;
}
}

message AllSqlConnectionsStats {
repeated SqlConnectionStats connections = 1;
}

message ConflictManagerStats {
Expand Down
40 changes: 25 additions & 15 deletions centreon-broker/core/src/broker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,29 +190,39 @@ grpc::Status broker_impl::GetGenericStats(
}

grpc::Status broker_impl::GetSqlConnectionStats(grpc::ServerContext* context
__attribute__((unused)),
const GenericInt* request,
SqlConnectionStats* response) {
__attribute__((unused)),
const GenericInt* request,
SqlConnectionStats* response) {
uint32_t index = request->value();
stats::center::instance().get_sql_connection_stats(index, response);
return grpc::Status::OK;
auto status =
stats::center::instance().get_sql_connection_stats(index, response);
return status ? grpc::Status::OK
: grpc::Status(
grpc::StatusCode::NOT_FOUND,
std::string("no sql connection stats found for index: " +
std::to_string(index)));
}

grpc::Status broker_impl::GetAllSqlConnectionsStats(
grpc::ServerContext* context __attribute__((unused)),
const ::google::protobuf::Empty* request __attribute__((unused)),
AllSqlConnectionsStats* response) {
stats::center::instance().get_all_sql_connections_stats(response);
return grpc::Status::OK;
}

grpc::Status broker_impl::GetConflictManagerStats(grpc::ServerContext* context
__attribute__((unused)),
const ::google::protobuf::Empty* request
__attribute__((unused)),
ConflictManagerStats* response) {
grpc::Status broker_impl::GetConflictManagerStats(
grpc::ServerContext* context __attribute__((unused)),
const ::google::protobuf::Empty* request __attribute__((unused)),
ConflictManagerStats* response) {
stats::center::instance().get_conflict_manager_stats(response);
return grpc::Status::OK;
}

grpc::Status broker_impl::GetSqlConnectionSize(grpc::ServerContext* context
__attribute__((unused)),
const ::google::protobuf::Empty* request
__attribute__((unused)),
GenericSize* response) {
grpc::Status broker_impl::GetSqlConnectionSize(
grpc::ServerContext* context __attribute__((unused)),
const ::google::protobuf::Empty* request __attribute__((unused)),
GenericSize* response) {
stats::center::instance().get_sql_connection_size(response);
return grpc::Status::OK;
}
91 changes: 48 additions & 43 deletions centreon-broker/core/src/mysql_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

#include <cstring>

#include <google/protobuf/util/time_util.h>

#include "com/centreon/broker/config/applier/init.hh"
#include "com/centreon/broker/log_v2.hh"
#include "com/centreon/broker/mysql_manager.hh"
Expand Down Expand Up @@ -98,25 +96,23 @@ void mysql_connection::_clear_connection() {
}
_stmt.clear();
mysql_close(_conn);
_is_connected = false;
_switch_point = std::time(nullptr);
update_stats();
}

/**
* @brief Fill statistics if it happened more than 1 second ago
*/
void mysql_connection::updateStats(void) noexcept {
void mysql_connection::update_stats(void) noexcept {
auto now(std::time(nullptr));
if ((now - _clk) > 1000) {
_clk = now;
std::chrono::time_point<std::chrono::steady_clock> nowPoint(
std::chrono::steady_clock::now());
stats::center::instance().execute([this, nowPoint] {
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
nowPoint - _startPoint)
.count();
stats::center::instance().execute([this]() {
_stats->set_waiting_tasks(_tasks_count);
_stats->set_is_connected(_isConnected);
*_stats->mutable_uptime() =
google::protobuf::util::TimeUtil::MillisecondsToDuration(duration);
_stats->set_is_connected(_is_connected);
_is_connected ? _stats->set_up_since(_switch_point) :
_stats->set_down_since(_switch_point);
});
}
}
Expand All @@ -135,6 +131,9 @@ bool mysql_connection::_try_to_reconnect() {
_conn = mysql_init(nullptr);
if (!_conn) {
log_v2::sql()->error("mysql_connection: reconnection failed.");
_is_connected = false;
_switch_point = std::time(nullptr);
update_stats();
return false;
}

Expand All @@ -147,6 +146,9 @@ bool mysql_connection::_try_to_reconnect() {
CLIENT_FOUND_ROWS)) {
log_v2::sql()->error(
"mysql_connection: The mysql/mariadb database seems not started.");
_is_connected = false;
_switch_point = std::time(nullptr);
update_stats();
return false;
}

Expand Down Expand Up @@ -526,6 +528,16 @@ std::string mysql_connection::get_error_message() {
return _error.get_message();
}

bool mysql_connection::is_connected(void) const noexcept
{
return _is_connected;
}

std::time_t mysql_connection::get_switch_point(void) const noexcept
{
return _switch_point;
}

/**
* @brief Disable the connection's error. Therefore, the connection is no more
* in error.
Expand Down Expand Up @@ -623,23 +635,11 @@ void mysql_connection::_run() {
std::list<std::unique_ptr<database::mysql_task>> tasks_list;
if (!_tasks_list.empty()) {
std::swap(_tasks_list, tasks_list);
//
updateStats();
/*
stats::center::instance().update(&SqlConnectionStats::set_waiting_tasks,
_stats,
static_cast<int>(_tasks_count));
*/
update_stats();
assert(_tasks_list.empty());
} else {
_tasks_count = 0;
//
updateStats();
/*
stats::center::instance().update(&SqlConnectionStats::set_waiting_tasks,
_stats,
static_cast<int>(_tasks_count));
*/
update_stats();
_tasks_condition.wait(
lock, [this] { return _finish_asked || !_tasks_list.empty(); });
if (_tasks_list.empty()) {
Expand All @@ -652,10 +652,18 @@ void mysql_connection::_run() {

log_v2::sql()->trace("SQL: performing mysql_ping.");
if (mysql_ping(_conn)) {
if (!_try_to_reconnect())
if (!_try_to_reconnect()) {
log_v2::sql()->error("SQL: Reconnection failed.");
} else
_is_connected = false;
_switch_point = std::time(nullptr);
}
} else {
log_v2::sql()->trace("SQL: connection always alive");
_is_connected = true;
_switch_point = std::time(nullptr);
}

update_stats();

time_t start = time(nullptr);
for (auto& task : tasks_list) {
Expand All @@ -669,13 +677,7 @@ void mysql_connection::_run() {

if (time(nullptr) - start != 0) {
start = time(nullptr);
//
updateStats();
/*
stats::center::instance().update(
&SqlConnectionStats::set_waiting_tasks, _stats,
static_cast<int>(_tasks_count));
*/
update_stats();
}
}

Expand All @@ -696,15 +698,15 @@ mysql_connection::mysql_connection(database_config const& db_cfg)
_finish_asked(false),
_tasks_count(0),
_need_commit(false),
_isConnected(false),
_startPoint{std::chrono::steady_clock::now()},
_host(db_cfg.get_host()),
_socket(db_cfg.get_socket()),
_user(db_cfg.get_user()),
_pwd(db_cfg.get_password()),
_name(db_cfg.get_name()),
_port(db_cfg.get_port()),
_state(not_started),
_is_connected(false),
_switch_point{std::time(nullptr)},
_stats{stats::center::instance().register_mysql_connection()},
_clk{std::time(nullptr)},
_qps(db_cfg.get_queries_per_transaction()) {
Expand All @@ -714,17 +716,17 @@ mysql_connection::mysql_connection(database_config const& db_cfg)
_start_condition.wait(lck, [this] { return _state != not_started; });
if (_state == finished) {
_thread->join();
_switch_point = std::time(nullptr);
_is_connected = false;
log_v2::sql()->error("mysql_connection: error while starting connection");
update_stats();
throw msg_fmt("mysql_connection: error while starting connection");
}
pthread_setname_np(_thread->native_handle(), "mysql_connect");
_switch_point = std::time(nullptr);
_is_connected = true;
log_v2::sql()->info("mysql_connection: connection started");

updateStats();
// stats::center::instance().update(&SqlConnectionStats::set_waiting_tasks,
// _stats, 0);
// stats::center::instance().update(&SqlConnectionStats::set_uptime, _stats,
// 0);
update_stats();
}

/**
Expand All @@ -735,6 +737,8 @@ mysql_connection::~mysql_connection() {
log_v2::sql()->info("mysql_connection: finished");
finish();
_thread->join();
stats::center::instance().unregister_mysql_connection(_stats);
update_stats();
}

void mysql_connection::_push(std::unique_ptr<mysql_task>&& q) {
Expand All @@ -745,6 +749,7 @@ void mysql_connection::_push(std::unique_ptr<mysql_task>&& q) {
_tasks_list.push_back(std::move(q));
++_tasks_count;
_tasks_condition.notify_all();
update_stats();
}

/**
Expand Down
12 changes: 11 additions & 1 deletion centreon-broker/core/src/mysql_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,15 @@ std::map<std::string, std::string> mysql_manager::get_stats() {
key.replace(key_len, std::string::npos, std::to_string(i));
retval.insert(std::make_pair(key, std::to_string(_stats_counts[i])));
}
retval.insert(std::make_pair("size", std::to_string(_connection.size())));
for (std::vector<std::shared_ptr<mysql_connection>>::iterator conn = _connection.begin();
conn != _connection.end(); ++conn) {
std::string conn_name = "connection_" + std::to_string(std::distance(_connection.begin(), conn));
std::stringstream ss;
ss << "connected: " << std::boolalpha << (*conn)->is_connected();
ss << ", since: " << std::to_string((*conn)->get_switch_point());
ss << ", tasks: " << std::to_string((*conn)->get_tasks_count());
retval.insert(std::make_pair(conn_name, ss.str()));
}
return retval;
}
}
Loading

0 comments on commit 9bc1623

Please sign in to comment.