Skip to content

Commit

Permalink
Merge pull request #43 from centreon/msg_inheritance_corruption
Browse files Browse the repository at this point in the history
enh(corruption) : replacing msg inheritance by msg_fmt
  • Loading branch information
jbrouze committed Jul 3, 2020
2 parents 5f74750 + 8ab61a5 commit e2a42b2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 60 deletions.
4 changes: 2 additions & 2 deletions include/com/centreon/broker/compression/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ namespace compression {
*/
class stream : public io::stream {
public:
static int const max_data_size = 100000000;
static int const max_data_size;

stream(int level = -1, size_t size = 0);
stream(stream const& other);
~stream();
stream& operator=(stream const& other);
int flush();
bool read(std::shared_ptr<io::data>& d, time_t deadline = (time_t)-1);
bool read(std::shared_ptr<io::data>& d, time_t deadline = (time_t) - 1);
void statistics(json11::Json::object& tree) const;
int write(std::shared_ptr<io::data> const& d);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
** For more information : contact@centreon.com
*/

#ifndef CCB_EXCEPTIONS_CORRUPTION_HH
#define CCB_EXCEPTIONS_CORRUPTION_HH
#ifndef CC_EXCEPTIONS_CORRUPTION_HH
#define CC_EXCEPTIONS_CORRUPTION_HH

#include "com/centreon/broker/exceptions/msg.hh"
#include "com/centreon/broker/namespace.hh"
#include "com/centreon/exceptions/msg_fmt.hh"
#include "com/centreon/namespace.hh"

CCB_BEGIN()
CC_BEGIN()

namespace exceptions {
/**
* @class corruption corruption.hh
* "com/centreon/broker/exceptions/corruption.hh"
* "com/centreon/exceptions/corruption.hh"
* @brief Shutdown exception class.
*
* This exception is thrown when someone attemps to read from a
* stream that has been corruption.
*/
class corruption : public msg {
class corruption : public msg_fmt {
public:
corruption() = default;
corruption(corruption const&) = default;
template <typename... Args>
explicit corruption(std::string const& str, const Args&... args)
: msg_fmt(fmt::format(str, args...)) {}

corruption() = delete;
~corruption() noexcept {}
corruption& operator=(const corruption&) = delete;

Expand All @@ -45,14 +48,9 @@ class corruption : public msg {
*
* @param[in] t Data to insert.
*/
template <typename T>
corruption& operator<<(T t) noexcept {
*(misc::stringifier*)this << t;
return *this;
}
};
} // namespace exceptions

CCB_END()
CC_END()

#endif // !CCB_EXCEPTIONS_CORRUPTION_HH
#endif // !CC_EXCEPTIONS_CORRUPTION_HH
55 changes: 32 additions & 23 deletions src/ccb_core/compression/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

#include "com/centreon/broker/compression/stream.hh"
#include "com/centreon/broker/compression/zlib.hh"
#include "com/centreon/broker/exceptions/corruption.hh"
#include "com/centreon/exceptions/corruption.hh"
#include "com/centreon/broker/exceptions/interrupt.hh"
#include "com/centreon/broker/exceptions/shutdown.hh"
#include "com/centreon/broker/exceptions/timeout.hh"
#include "com/centreon/broker/io/events.hh"
#include "com/centreon/broker/io/raw.hh"
#include "com/centreon/broker/logging/logging.hh"

using namespace com::centreon::exceptions;
using namespace com::centreon::broker;
using namespace com::centreon::broker::compression;

int const stream::max_data_size = 100000000;
/**************************************
* *
* Public Methods *
Expand Down Expand Up @@ -118,10 +120,11 @@ bool stream::read(std::shared_ptr<io::data>& data, time_t deadline) {
// Check if size is within bounds.
if ((size <= 0) || (size > max_data_size)) {
// Skip corrupted data, one byte at a time.
logging::error(logging::low)
<< "compression: " << this << " got corrupted packet size of "
<< size << " bytes, not in the 0-" << max_data_size
<< " range, skipping next byte";
logging::error(logging::low) << "compression: " << this
<< " got corrupted packet size of "
<< size << " bytes, not in the 0-"
<< max_data_size
<< " range, skipping next byte";
if (!skipped)
logging::error(logging::high) << "compression: peer " << peer()
<< " is sending corrupted data";
Expand All @@ -145,7 +148,8 @@ bool stream::read(std::shared_ptr<io::data>& data, time_t deadline) {
zlib::uncompress(reinterpret_cast<unsigned char const*>(
(_rbuffer.data() + sizeof(int32_t))),
size);
} catch (exceptions::corruption const& e) {
}
catch (corruption const& e) {
logging::debug(logging::medium) << e.what();
}
}
Expand All @@ -155,15 +159,16 @@ bool stream::read(std::shared_ptr<io::data>& data, time_t deadline) {
<< "compression: " << this
<< " got corrupted compressed data, skipping next byte";
if (!skipped)
logging::error(logging::high)
<< "compression: peer " << peer() << " is sending corrupted data";
logging::error(logging::high) << "compression: peer " << peer()
<< " is sending corrupted data";
++skipped;
_rbuffer.pop(1);
corrupted = true;
} else {
logging::debug(logging::low)
<< "compression: " << this << " uncompressed "
<< size + sizeof(int32_t) << " bytes to " << r->size() << " bytes";
logging::debug(logging::low) << "compression: " << this
<< " uncompressed "
<< size + sizeof(int32_t) << " bytes to "
<< r->size() << " bytes";
data = r;
_rbuffer.pop(size + sizeof(int32_t));
corrupted = false;
Expand All @@ -173,13 +178,16 @@ bool stream::read(std::shared_ptr<io::data>& data, time_t deadline) {
logging::info(logging::high)
<< "compression: peer " << peer() << " sent " << skipped
<< " corrupted compressed bytes, resuming processing";
} catch (exceptions::interrupt const& e) {
}
catch (exceptions::interrupt const& e) {
(void)e;
return true;
} catch (exceptions::timeout const& e) {
}
catch (exceptions::timeout const& e) {
(void)e;
return false;
} catch (exceptions::shutdown const& e) {
}
catch (exceptions::shutdown const& e) {
_shutdown = true;
if (!_wbuffer.empty()) {
std::shared_ptr<io::raw> r(new io::raw);
Expand Down Expand Up @@ -238,13 +246,14 @@ int stream::write(std::shared_ptr<io::data> const& d) {

// Check length.
if (r.size() > max_data_size)
throw exceptions::msg()
<< "cannot compress buffers longer than " << max_data_size
<< " bytes: you should report this error "
<< "to Centreon Broker developers";
throw msg_fmt(
"cannot compress buffers longer than {} bytes: you should report "
"this error to Centreon Broker developers",
max_data_size);
else if (r.size() > 0) {
// Append data to write buffer.
std::copy(r.get_buffer().begin(), r.get_buffer().end(),
std::copy(r.get_buffer().begin(),
r.get_buffer().end(),
std::back_inserter(_wbuffer));

// Send compressed data if size limit is reached.
Expand Down Expand Up @@ -275,10 +284,10 @@ void stream::_flush() {
std::shared_ptr<io::raw> compressed(new io::raw);
std::vector<char>& data(compressed->get_buffer());
data = std::move(zlib::compress(_wbuffer, _level));
logging::debug(logging::low)
<< "compression: " << this << " compressed " << _wbuffer.size()
<< " bytes to " << compressed->size() << " bytes (level " << _level
<< ")";
logging::debug(logging::low) << "compression: " << this << " compressed "
<< _wbuffer.size() << " bytes to "
<< compressed->size() << " bytes (level "
<< _level << ")";
_wbuffer.clear();

// Add compressed data size.
Expand Down
38 changes: 22 additions & 16 deletions src/ccb_core/compression/zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
#include "com/centreon/broker/compression/zlib.hh"
#include <zlib.h>
#include "com/centreon/broker/compression/stream.hh"
#include "com/centreon/broker/exceptions/corruption.hh"
#include "com/centreon/exceptions/corruption.hh"
#include "com/centreon/broker/logging/logging.hh"

using namespace com::centreon::exceptions;
using namespace com::centreon::broker::compression;

/**************************************
Expand Down Expand Up @@ -53,8 +54,10 @@ std::vector<char> zlib::compress(std::vector<char> const& data,
int res;
do {
retval.resize(len + 4);
res = ::compress2(reinterpret_cast<Bytef*>(retval.data()) + 4, &len,
reinterpret_cast<Bytef const*>(&data[0]), nbytes,
res = ::compress2(reinterpret_cast<Bytef*>(retval.data()) + 4,
&len,
reinterpret_cast<Bytef const*>(&data[0]),
nbytes,
compression_level);

switch (res) {
Expand All @@ -66,8 +69,8 @@ std::vector<char> zlib::compress(std::vector<char> const& data,
retval[3] = (nbytes & 0xff);
break;
case Z_MEM_ERROR:
throw(exceptions::msg() << "compression: not enough memory to compress "
<< nbytes << " bytes");
throw msg_fmt("compression: not enough memory to compress {} bytes",
nbytes);
break;
case Z_BUF_ERROR:
len <<= 1;
Expand Down Expand Up @@ -95,36 +98,39 @@ std::vector<char> zlib::uncompress(unsigned char const* data, uLong nbytes) {
if (nbytes <= 4) {
if (nbytes < 4 ||
(data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0))
throw exceptions::corruption()
<< "compression: attempting to uncompress data with invalid size";
throw corruption(
"compression: attempting to uncompress data with invalid size");
}
ulong expected_size =
(data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
ulong len = (expected_size > 1ul) ? expected_size : 1ul;
if (len > stream::max_data_size)
throw exceptions::corruption()
<< "compression: data expected size is too big";
throw corruption("compression: data expected size is too big");
std::vector<char> uncompressed_array(len, '\0');

ulong alloc = len;

int res = ::uncompress(reinterpret_cast<Bytef*>(uncompressed_array.data()),
&len, static_cast<Bytef const*>(data) + 4, nbytes - 4);
&len,
static_cast<Bytef const*>(data) + 4,
nbytes - 4);

switch (res) {
case Z_OK:
if (len != alloc)
uncompressed_array.resize(len);
break;
case Z_MEM_ERROR:
throw exceptions::msg()
<< "compression: not enough memory to uncompress " << nbytes
<< " compressed bytes to " << len << " uncompressed bytes";
throw msg_fmt(
"compression: not enough memory to uncompress {} compressed bytes to "
"{} uncompressed bytes",
nbytes,
len);
case Z_BUF_ERROR:
case Z_DATA_ERROR:
throw exceptions::corruption()
<< "compression: compressed input data is corrupted, "
<< "unable to uncompress it";
throw corruption(
"compression: compressed input data is corrupted, unable to "
"uncompress it");
}
return uncompressed_array;
}
8 changes: 5 additions & 3 deletions tests/broker/compression/stream/write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
#include <gtest/gtest.h>
#include "com/centreon/broker/compression/stream.hh"
#include "com/centreon/broker/config/applier/init.hh"
#include "com/centreon/broker/exceptions/msg.hh"
#include "com/centreon/exceptions/msg_fmt.hh"
#include "com/centreon/broker/exceptions/shutdown.hh"
#include "com/centreon/broker/io/raw.hh"
#include "memory_stream.hh"

using namespace com::centreon::exceptions;
using namespace com::centreon::broker;

class CompressionStreamWrite : public ::testing::Test {
public:
void SetUp() override {
try {
config::applier::init();
} catch (std::exception const& e) {
}
catch (std::exception const& e) {
(void)e;
}
_stream.reset(new compression::stream(-1, 20000));
Expand Down Expand Up @@ -150,7 +152,7 @@ TEST_F(CompressionStreamWrite, TooMuchData) {
r->resize(compression::stream::max_data_size + 10);

// Then
ASSERT_THROW(_stream->write(r), exceptions::msg);
ASSERT_THROW(_stream->write(r), msg_fmt);
}

// Given a compression stream
Expand Down

0 comments on commit e2a42b2

Please sign in to comment.