Skip to content

Commit

Permalink
MON-16333 issues after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-christophe81 committed Mar 17, 2023
1 parent 9d8ef28 commit bfbdae9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 32 deletions.
13 changes: 6 additions & 7 deletions broker/core/src/config/applier/init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,19 @@ void config::applier::init(size_t n_thread,
void config::applier::deinit() {
mode = finished;
config::applier::endpoint::unload();
std::shared_ptr<multiplexing::engine> engine_instance =
multiplexing::engine::instance_ptr();
if (engine_instance) {
engine_instance->stop();
{
auto eng = multiplexing::engine::instance_ptr();
if (eng)
eng->clear();
multiplexing::engine::unload();
}
config::applier::state::unload();
io::events::unload();
io::protocols::unload();
mysql_manager::unload();
stats::center::unload();
file::disk_accessor::unload();
if (engine_instance) {
multiplexing::engine::unload();
}

pool::unload();
}

Expand Down
1 change: 0 additions & 1 deletion broker/core/src/multiplexing/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ std::mutex engine::_load_m;
* @return Class instance.
*/
std::shared_ptr<engine> engine::instance_ptr() {
std::lock_guard<std::mutex> lk(_load_m);
return _instance;
}

Expand Down
79 changes: 55 additions & 24 deletions broker/core/src/multiplexing/muxer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,33 @@ muxer::muxer(std::string name,
_name, _events_size, _file ? "enable" : "disable");
}

engine::instance_ptr()->subscribe(this);
/**
* @brief muxer must be in a shared_ptr
* so this static method creates it and registers it in engine
*
* @param name
* @param r_filters
* @param w_filters
* @param persistent
* @return std::shared_ptr<muxer>
*/
std::shared_ptr<muxer> muxer::create(std::string name,
muxer::filters r_filters,
muxer::filters w_filters,
bool persistent) {
std::shared_ptr<muxer> ret(new muxer(name, r_filters, w_filters, persistent));
engine::instance_ptr()->subscribe(ret);
return ret;
}

/**
* Destructor.
*/
muxer::~muxer() noexcept {
stats::center::instance().unregister_muxer(_name);
engine::instance_ptr()->unsubscribe(this);
auto eng = engine::instance_ptr();
if (eng)
eng->unsubscribe(this);
std::lock_guard<std::mutex> lock(_mutex);
log_v2::core()->info("Destroying muxer {}: number of events in the queue: {}",
_name, _events_size);
Expand Down Expand Up @@ -201,32 +219,45 @@ uint32_t muxer::event_queue_max_size() noexcept {
*
* @param[in] event Event to add.
*/
void muxer::publish(const std::shared_ptr<io::data> event) {
if (event) {
SPDLOG_LOGGER_TRACE(log_v2::core(), "{} publish {}", _name, *event);

std::lock_guard<std::mutex> lock(_mutex);
// Check if we should process this event.
if (_write_filters.find(event->type()) == _write_filters.end()) {
SPDLOG_LOGGER_TRACE(log_v2::core(), "{} reject {}", _name, *event);
void muxer::publish(const std::deque<std::shared_ptr<io::data>>& event_queue) {
auto evt = event_queue.begin();
while (evt != event_queue.end()) {
bool at_least_one_push_to_queue = false;
{ // we stop this first loop when mux queue is full on order to release
// mutex to let read do his job before write to file
std::lock_guard<std::mutex> lock(_mutex);
for (; evt != event_queue.end() && _events_size < event_queue_max_size();
++evt) {
if (_write_filters.find((*evt)->type()) == _write_filters.end()) {
continue;
}
at_least_one_push_to_queue = true;
log_v2::core()->trace("muxer::publish {} publish one event to queue");
_push_to_queue(*evt);
}
}
if (evt == event_queue.end()) {
return;
}
// Check if the event queue limit is reach.
if (_events_size >= event_queue_max_size()) {
// Try to create file if is necessary.
if (!_file) {
QueueFileStats* s =
stats::center::instance().muxer_stats(_name)->mutable_queue_file();
_file = std::make_unique<persistent_file>(_queue_file_name, s);
// we have stopped insertion because of full queue => retry
if (at_least_one_push_to_queue) {
continue;
}
// nothing pushed => to file
std::lock_guard<std::mutex> lock(_mutex);
if (!_file) {
QueueFileStats* s =
stats::center::instance().muxer_stats(_name)->mutable_queue_file();
_file = std::make_unique<persistent_file>(_queue_file_name, s);
}
for (; evt != event_queue.end(); ++evt) {
if (_write_filters.find((*evt)->type()) == _write_filters.end()) {
continue;
}

SPDLOG_LOGGER_TRACE(log_v2::core(), "{} push to file {}", _name, *event);
_file->write(event);
} else {
SPDLOG_LOGGER_TRACE(log_v2::core(), "{} push to queue {}", _name, *event);
_push_to_queue(event);
_file->write(*evt);
log_v2::core()->trace("muxer::publish {} publish one event to file {}",
_name, _queue_file_name);
}
_update_stats();
}
_update_stats();
}
Expand Down

0 comments on commit bfbdae9

Please sign in to comment.