Skip to content

Commit

Permalink
Merge pull request #28 from centreon/replacing-msg-in-influxdb
Browse files Browse the repository at this point in the history
Replacing msg in influxdb
  • Loading branch information
jbrouze committed Jun 25, 2020
2 parents 287ef42 + 15153f6 commit 4f0d1a6
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 345 deletions.
4 changes: 2 additions & 2 deletions src/70-influxdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ add_library(70-influxdb SHARED
${CMAKE_SOURCE_DIR}/src/70-influxdb/stream.cc
)
set_target_properties(70-influxdb PROPERTIES PREFIX "")
target_link_libraries(70-influxdb CONAN_PKG::asio)
install(TARGETS 70-influxdb DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/centreon/lib/centreon-broker)
target_link_libraries(70-influxdb CONAN_PKG::asio CONAN_PKG::fmt)
install(TARGETS 70-influxdb DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/centreon/lib/centreon-broker)
22 changes: 7 additions & 15 deletions src/70-influxdb/column.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
*/

#include "com/centreon/broker/influxdb/column.hh"
#include "com/centreon/broker/exceptions/msg.hh"
#include "com/centreon/exceptions/msg_fmt.hh"

using namespace com::centreon::exceptions;
using namespace com::centreon::broker;
using namespace com::centreon::broker::influxdb;

Expand Down Expand Up @@ -75,36 +76,28 @@ column& column::operator=(column const& c) {
*
* @return The name of this column.
*/
std::string const& column::get_name() const {
return (_name);
}
std::string const& column::get_name() const { return (_name); }

/**
* Get the value of this column.
*
* @return The value of this column.
*/
std::string const& column::get_value() const {
return (_value);
}
std::string const& column::get_value() const { return (_value); }

/**
* Is this column a flag?
*
* @return True if this column is a flag.
*/
bool column::is_flag() const {
return (_is_flag);
}
bool column::is_flag() const { return (_is_flag); }

/**
* Get the type of this column.
*
* @return The type of this column.
*/
column::type column::get_type() const {
return (_type);
}
column::type column::get_type() const { return (_type); }

/**
* Parse a string containing a type.
Expand All @@ -118,6 +111,5 @@ column::type column::parse_type(std::string const& type) {
return (string);
else if (type == "number")
return (number);
throw(exceptions::msg() << "influxdb: couldn't parse column type '" << type
<< "'");
throw msg_fmt("influxdb: couldn't parse column type '{}'", type);
}
81 changes: 48 additions & 33 deletions src/70-influxdb/factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
#include <sstream>
#include <vector>
#include "com/centreon/broker/config/parser.hh"
#include "com/centreon/broker/exceptions/msg.hh"
#include "com/centreon/exceptions/msg_fmt.hh"
#include "com/centreon/broker/influxdb/column.hh"
#include "com/centreon/broker/influxdb/connector.hh"

using namespace com::centreon::exceptions;
using namespace com::centreon::broker;
using namespace com::centreon::broker::influxdb;
using namespace json11;
Expand All @@ -49,8 +50,7 @@ static std::string find_param(config::endpoint const& cfg,
std::string const& key) {
std::map<std::string, std::string>::const_iterator it{cfg.params.find(key)};
if (cfg.params.end() == it)
throw exceptions::msg() << "influxdb: no '" << key
<< "' defined for endpoint '" << cfg.name << "'";
throw msg_fmt("influxdb: no '{}' defined for endpoint '{}'", key, cfg.name);
return it->second;
}

Expand Down Expand Up @@ -85,10 +85,10 @@ bool factory::has_endpoint(config::endpoint& cfg) const {
*
* @return Endpoint matching the given configuration.
*/
io::endpoint* factory::new_endpoint(
config::endpoint& cfg,
bool& is_acceptor,
std::shared_ptr<persistent_cache> cache) const {
io::endpoint* factory::new_endpoint(config::endpoint& cfg,
bool& is_acceptor,
std::shared_ptr<persistent_cache> cache)
const {
std::string user(find_param(cfg, "db_user"));
std::string passwd(find_param(cfg, "db_password"));
std::string addr(find_param(cfg, "db_host"));
Expand All @@ -105,9 +105,10 @@ io::endpoint* factory::new_endpoint(
ss << it->second;
ss >> port;
if (!ss.eof())
throw exceptions::msg()
<< "influxdb: couldn't parse port '" << ss.str()
<< "' defined for endpoint '" << cfg.name << "'";
throw msg_fmt(
"influxdb: couldn't parse port '{}' defined for endpoint '{}'",
ss.str(),
cfg.name);
}
}

Expand All @@ -118,23 +119,25 @@ io::endpoint* factory::new_endpoint(
if (it != cfg.params.end())
try {
queries_per_transaction = std::stoul(it->second);
} catch (std::exception const& ex) {
throw exceptions::msg()
<< "influxdb: couldn't parse queries_per_transaction '"
<< it->second << "' defined for endpoint '" << cfg.name << "'";
}
else
queries_per_transaction = 1000;
catch (std::exception const& ex) {
throw msg_fmt(
"influxdb: couldn't parse queries_per_transaction '{}' defined for "
"endpoint '{}'",
it->second,
cfg.name);
}
else queries_per_transaction = 1000;
}

auto chk_str = [](Json const& js) -> std::string {
auto chk_str = [](Json const & js)->std::string {
if (!js.is_string() || js.string_value().empty()) {
throw exceptions::msg()
<< "influxdb: couldn't get the configuration of a metric column name";
throw msg_fmt(
"influxdb: couldn't get the configuration of a metric column name");
}
return js.string_value();
};
auto chk_bool = [](std::string const& boolean) -> bool {
auto chk_bool = [](std::string const & boolean)->bool {
if (boolean == "yes" || boolean == "true")
return true;
return false;
Expand All @@ -145,14 +148,16 @@ io::endpoint* factory::new_endpoint(
std::vector<column> status_column_list;
Json const& status_columns = cfg.cfg["status_column"];
if (status_columns.is_object())
status_column_list.push_back(column(
chk_str(status_columns["name"]), chk_str(status_columns["value"]),
chk_bool(chk_str(status_columns["is_tag"])),
column::parse_type(chk_str(status_columns["type"]))));
status_column_list.push_back(
column(chk_str(status_columns["name"]),
chk_str(status_columns["value"]),
chk_bool(chk_str(status_columns["is_tag"])),
column::parse_type(chk_str(status_columns["type"]))));
else if (status_columns.is_array())
for (Json const& object : status_columns.array_items())
status_column_list.push_back(
column(chk_str(object["name"]), chk_str(object["value"]),
column(chk_str(object["name"]),
chk_str(object["value"]),
chk_bool(chk_str(object["is_tag"])),
column::parse_type(chk_str(object["type"]))));

Expand All @@ -161,22 +166,32 @@ io::endpoint* factory::new_endpoint(
std::vector<column> metric_column_list;
Json const& metric_columns = cfg.cfg["metrics_column"];
if (metric_columns.is_object())
metric_column_list.push_back(column(
chk_str(metric_columns["name"]), chk_str(metric_columns["value"]),
chk_bool(chk_str(metric_columns["is_tag"])),
column::parse_type(chk_str(metric_columns["type"]))));
metric_column_list.push_back(
column(chk_str(metric_columns["name"]),
chk_str(metric_columns["value"]),
chk_bool(chk_str(metric_columns["is_tag"])),
column::parse_type(chk_str(metric_columns["type"]))));
else if (metric_columns.is_array())
for (Json const& object : metric_columns.array_items())
metric_column_list.push_back(
column(chk_str(object["name"]), chk_str(object["value"]),
column(chk_str(object["name"]),
chk_str(object["value"]),
chk_bool(chk_str(object["is_tag"])),
column::parse_type(chk_str(object["type"]))));

// Connector.
std::unique_ptr<influxdb::connector> c(new influxdb::connector);
c->connect_to(user, passwd, addr, port, db, queries_per_transaction,
status_timeseries, status_column_list, metric_timeseries,
metric_column_list, cache);
c->connect_to(user,
passwd,
addr,
port,
db,
queries_per_transaction,
status_timeseries,
status_column_list,
metric_timeseries,
metric_column_list,
cache);
is_acceptor = false;
return c.release();
}
92 changes: 51 additions & 41 deletions src/70-influxdb/influxdb12.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
#include "com/centreon/broker/influxdb/influxdb12.hh"
#include <iterator>
#include <vector>
#include "com/centreon/broker/exceptions/msg.hh"
#include "com/centreon/exceptions/msg_fmt.hh"
#include "com/centreon/broker/logging/logging.hh"
#include "com/centreon/broker/misc/string.hh"

using namespace asio;
using namespace com::centreon::exceptions;
using namespace com::centreon::broker::influxdb;

static const char* query_footer = "\n";
Expand All @@ -46,8 +47,8 @@ influxdb12::influxdb12(std::string const& user,
logging::debug(logging::medium)
<< "influxdb: connecting using 1.2 Line Protocol";
_connect_socket();
_create_queries(user, passwd, db, status_ts, status_cols, metric_ts,
metric_cols);
_create_queries(
user, passwd, db, status_ts, status_cols, metric_ts, metric_cols);
}

/**
Expand All @@ -58,9 +59,7 @@ influxdb12::~influxdb12() {}
/**
* Clear the query.
*/
void influxdb12::clear() {
_query.clear();
}
void influxdb12::clear() { _query.clear(); }

/**
* Write a metric to the query.
Expand Down Expand Up @@ -106,10 +105,12 @@ void influxdb12::commit() {

asio::write(_socket, buffer(final_query), asio::transfer_all(), err);
if (err)
throw exceptions::msg()
<< "influxdb: couldn't commit data to InfluxDB with address '"
<< _socket.remote_endpoint().address().to_string() << "' and port '"
<< _socket.remote_endpoint().port() << "': " << err.message();
throw msg_fmt(
"influxdb: couldn't commit data to InfluxDB with address '{}' and port "
"'{}': {}",
_socket.remote_endpoint().address().to_string(),
_socket.remote_endpoint().port(),
err.message());
// Receive the server answer.

std::string answer;
Expand All @@ -118,18 +119,20 @@ void influxdb12::commit() {
do {
answer.resize(read_size);

total_read += _socket.read_some(asio::buffer(&answer[total_read], read_size - total_read), err);
total_read += _socket.read_some(
asio::buffer(&answer[total_read], read_size - total_read), err);
if (total_read == read_size)
total_read += 2048;

answer.resize(total_read);

if (err)
throw exceptions::msg()
<< "influxdb: couldn't receive InfluxDB answer with address '"
<< _socket.remote_endpoint().address().to_string() << "' and port '"
<< _socket.remote_endpoint().port() << "': " << err.message();

throw msg_fmt(
"influxdb: couldn't receive InfluxDB answer with address '{}' and "
"port '{}': {}",
_socket.remote_endpoint().address().to_string(),
_socket.remote_endpoint().port(),
err.message());

} while (!_check_answer_string(answer));
_socket.shutdown(ip::tcp::socket::shutdown_both);
Expand Down Expand Up @@ -166,14 +169,21 @@ void influxdb12::_connect_socket() {
}

if (err) {
throw exceptions::msg()
<< "influxdb: couldn't connect to InfluxDB with address '" << _host
<< "' and port '" << _port << "': " << err.message();
throw msg_fmt(
"influxdb: couldn't connect to InfluxDB with address '{}' and port "
"'{}': {}",
_host,
_port,
err.message());
}
} catch (std::system_error const& se) {
throw exceptions::msg()
<< "influxdb: couldn't connect to InfluxDB with address '" << _host
<< "' and port '" << _port << "': " << se.what();
}
catch (std::system_error const& se) {
throw msg_fmt(
"influxdb: couldn't connect to InfluxDB with address '{}' and port "
"'{}': {}",
_host,
_port,
se.what());
}
}

Expand All @@ -199,30 +209,30 @@ bool influxdb12::_check_answer_string(std::string const& ans) {
std::istringstream iss(first_line_str);
std::vector<std::string> split;
std::copy(std::istream_iterator<std::string>(iss),
std::istream_iterator<std::string>(), std::back_inserter(split));
std::istream_iterator<std::string>(),
std::back_inserter(split));

if (split.size() < 3)
throw exceptions::msg()
<< "influxdb: unrecognizable HTTP header for '"
<< _socket.remote_endpoint().address().to_string() << "' and port '"
<< _socket.remote_endpoint().port() << "': got '" << first_line_str
<< "'";
throw msg_fmt(
"influxdb: unrecognizable HTTP header for '{}' and port '{}': got '{}'",
_socket.remote_endpoint().address().to_string(),
_socket.remote_endpoint().port(),
first_line_str);

if ((split[0] == "HTTP/1.0") && (split[1] == "204") && (split[2] == "No") &&
(split[3] == "Content"))
return true;
else if (ans.find("partial write: points beyond retention policy dropped") !=
std::string::npos) {
logging::info(logging::medium) << "influxdb: sending points beyond "
"Influxdb database configured "
"retention policy";
"Influxdb database configured "
"retention policy";
return true;
}
else
throw exceptions::msg()
<< "influxdb: got an error from '"
<< _socket.remote_endpoint().address().to_string() << "' and port '"
<< _socket.remote_endpoint().port() << "': '" << ans << "'";
} else
throw msg_fmt("influxdb: got an error from '{}' and port '{}': '{}'",
_socket.remote_endpoint().address().to_string(),
_socket.remote_endpoint().port(),
ans);
}

/**
Expand Down Expand Up @@ -252,8 +262,8 @@ void influxdb12::_create_queries(std::string const& user,
_post_header.append("POST ").append(base_url).append(" HTTP/1.0\n");

// Create protocol objects.
_status_query = line_protocol_query(status_ts, status_cols,
line_protocol_query::status, _cache);
_metric_query = line_protocol_query(metric_ts, metric_cols,
line_protocol_query::metric, _cache);
_status_query = line_protocol_query(
status_ts, status_cols, line_protocol_query::status, _cache);
_metric_query = line_protocol_query(
metric_ts, metric_cols, line_protocol_query::metric, _cache);
}
Loading

0 comments on commit 4f0d1a6

Please sign in to comment.