Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Potential issues, maybe calling `->stop()` on the loop after a file has
been opened, but before the `->read()` has completed, isn't defined????

Current error:

  0x00007ffff71403cf in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  (gdb) bt
  #0  0x00007ffff71403cf in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #1  0x00007ffff712e7f7 in uv_run () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #2  0x00007ffff7116d59 in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #3  0x00007ffff7117ecb in std::_Function_handler<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> (), std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::thread::_Invoker<std::tuple<IRLoggerToSpd::impl::start_thread(std::filesystem::__cxx11::path const&)::{lambda()#1}> >, void> >::_M_invoke(std::_Any_data const&) ()
     from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #4  0x00007ffff7111b3d in std::__future_base::_State_baseV2::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*, bool*) ()
     from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #5  0x00007ffff7ce2f68 in __pthread_once_slow (once_control=0x555555cf5ff8, init_routine=0x7ffff6f6fd50 <__once_proxy>) at ./nptl/pthread_once.c:116
  #6  0x00007ffff7115dd2 in std::__future_base::_Async_state_impl<std::thread::_Invoker<std::tuple<IRLoggerToSpd::impl::start_thread(std::filesystem::__cxx11::path const&)::{lambda()#1}> >, void>::_M_run() ()
     from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so
  #7  0x00007ffff6f71253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
  #8  0x00007ffff7cddb43 in start_thread (arg=<optimised out>) at ./nptl/pthread_create.c:442
  #9  0x00007ffff7d6fa00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
  • Loading branch information
aloisklink committed Oct 16, 2023
1 parent 64c6cc6 commit e4bfd2d
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 115 deletions.
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ target_link_libraries(irimager_class
spdlog::spdlog_header_only # less efficient, but avoids CXX11 ABI issues
)

set(MSAN TRUE)
if(ASAN)
target_compile_options(irimager INTERFACE $<$<CONFIG:Debug>:-fsanitize=address,undefined -fno-omit-frame-pointer -fno-common>)
target_link_libraries(irimager INTERFACE $<$<CONFIG:Debug>:-fsanitize=address,undefined>)
endif()

if(MSAN)
target_compile_options(irimager INTERFACE $<$<CONFIG:Debug>:-fno-omit-frame-pointer -fsanitize=memory>)
target_link_libraries(irimager INTERFACE $<$<CONFIG:Debug>:-fno-omit-frame-pointer -fsanitize=memory>)
endif()

if(IRImager_mock)
target_compile_definitions(irimager_class PRIVATE IR_IMAGER_MOCK)
else()
Expand Down
265 changes: 150 additions & 115 deletions src/nqm/irimager/irlogger_to_spd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
#include <memory>
#include <uvw.hpp>

#define uvw_error_event_handler \
([](const uvw::error_event &error, auto &) { \
spdlog::error("uvw error {}", error.what()); \
})

class IRLoggerToSpd::impl {
public:
impl(const std::filesystem::path &socket_path) { start_thread(socket_path); }
Expand All @@ -29,116 +34,146 @@ class IRLoggerToSpd::impl {
[this,
socket_path // capture by value, since this will run in another thread
] {
spdlog::debug("started in new thread");
spdlog::debug("started in new thread");

auto loop = uvw::loop::get_default();
auto loop = uvw::loop::get_default();

// TODO: do we need to check for nullptr?
this->message = loop->resource<uvw::async_handle>();
// TODO: do we need to check for nullptr?
this->message = loop->resource<uvw::async_handle>();

spdlog::debug("checking for early stop");
spdlog::debug("checking for early stop");

if (this->stop) { // in case calling thread stopped this before
// `message` was created
spdlog::info("early exit!");
loop->stop();
}
if (this->stop) { // in case calling thread stopped this before
// `message` was created
spdlog::info("early exit!");
loop->stop();
}

this->message->on<uvw::async_event>(
[this, &loop](const uvw::async_event &, uvw::async_handle &) {
if (this->stop) {
loop->stop();
}
});
this->message->on<uvw::error_event>(uvw_error_event_handler);
this->message->on<uvw::async_event>(
[this, &loop](const uvw::async_event &, uvw::async_handle &) {
if (this->stop) {
loop->stop();
}
});

spdlog::debug("message handler added");
spdlog::debug("message handler added");

#ifdef FIFO_STREAMING_WORKKING
auto log_file_reader = loop->resource<uvw::pipe_handle>();

log_file_reader->on<uvw::connect_event>(
[](const uvw::connect_event &, uvw::pipe_handle &pipe_handle) {
spdlog::debug("Connection");
pipe_handle.on<uvw::end_event>(
[](const uvw::end_event &, uvw::pipe_handle &srv) {
srv.close();
});
pipe_handle.on<uvw::data_event>(
[](const uvw::data_event &data, uvw::pipe_handle &) {
auto string_data =
std::string_view(data.data.get(), data.length);

// TODO PARSE LOGS
spdlog::info(string_data);
});
pipe_handle.read();
});

log_file_reader->on<uvw::error_event>(
[&loop](const uvw::error_event &error, uvw::pipe_handle &) {
spdlog::error("log_file_reader error {}", error.what());
});

spdlog::debug("Connecting to socket {}", socket_path.string());
log_file_reader->connect(socket_path.string());
log_file_reader->read();
auto log_file_reader = loop->resource<uvw::pipe_handle>();

log_file_reader->on<uvw::connect_event>(
[](const uvw::connect_event &, uvw::pipe_handle &pipe_handle) {
spdlog::debug("Connection");
pipe_handle.on<uvw::end_event>(
[](const uvw::end_event &, uvw::pipe_handle &srv) {
srv.close();
});
pipe_handle.on<uvw::data_event>([](const uvw::data_event &data,
uvw::pipe_handle &) {
auto string_data = std::string_view(data.data.get(), data.length);

// TODO PARSE LOGS
spdlog::info(string_data);
});
pipe_handle.read();
});

log_file_reader->on<uvw::error_event>(uvw_error_event_handler);

spdlog::debug("Connecting to socket {}", socket_path.string());
log_file_reader->connect(socket_path.string());
log_file_reader->read();
#endif

#define UVW_FILE_STREAMING 1
#ifdef UVW_FILE_STREAMING
/**
* This currently works, but it's very inefficient, since it maxes
* out the CPU to 100%!
*/
auto log_file_reader = loop->resource<uvw::file_req>();

size_t file_offset = 0;
log_file_reader->on<uvw::fs_event>([&](const auto &event, auto &req) {
switch (event.type) {
case uvw::fs_req::fs_type::READ: {
if (event.result != 0) {
auto string_data =
std::string_view(event.read.data.get(), event.result);
file_offset += event.result;

// TODO: PARSE LOGS somehow?
spdlog::info(string_data);
}

[[fallthrough]];
}
case uvw::fs_req::fs_type::OPEN:
req.read(static_cast<int64_t>(file_offset), 16);
break;
default:
spdlog::error(
"Unexpected fs_req type {}",
static_cast<std::underlying_type_t<uvw::fs_req::fs_type>>(
event.type));
[[fallthrough]];
case uvw::fs_req::fs_type::CLOSE:
spdlog::debug("Closing file {}", socket_path.string());
log_file_reader->close();
/**
* This currently works, but it's very inefficient, since it maxes
* out the CPU to 100%!
*/
auto log_file_reader = loop->resource<uvw::file_req>();

size_t file_offset = 0;
constexpr unsigned int MAX_READ_SIZE = 512;
bool reading = false;

log_file_reader->on<uvw::error_event>(uvw_error_event_handler);
log_file_reader->on<uvw::fs_event>([&](const auto &event, auto &req) {
spdlog::debug("Got fs_event");
switch (event.type) {
case uvw::fs_req::fs_type::READ: {
if (event.result == 0) {
spdlog::trace("Empty read, stop reading");
// empty read, stop reading until `log_file_watcher` restarts
// reading
reading = false;
break;
}
});
spdlog::trace("Got {} bytes of data", event.result);
auto string_data =
std::string_view(event.read.data.get(), event.result);
file_offset += event.result;

// TODO: PARSE LOGS somehow?
spdlog::info(string_data);
[[fallthrough]]; // read the next bit of data
}
case uvw::fs_req::fs_type::OPEN:
reading = true;
req.read(static_cast<int64_t>(file_offset), MAX_READ_SIZE);
break;
default:
spdlog::error(
"Unexpected fs_req type {}",
static_cast<std::underlying_type_t<uvw::fs_req::fs_type>>(
event.type));
[[fallthrough]];
case uvw::fs_req::fs_type::CLOSE:
spdlog::debug("Closing file {}", socket_path.string());
}

spdlog::debug("Reading from file at {}", socket_path.string());
log_file_reader->open(socket_path.string(),
uvw::file_req::file_open_flags::RDONLY, 0644);

#if 1
auto log_file_watcher = loop->resource<uvw::fs_event_handle>();
log_file_watcher->on<uvw::error_event>(uvw_error_event_handler);
log_file_watcher->on<uvw::fs_event_event>([&](const auto &, auto &) {
// may error if log_file_reader isn't yet open, but not a big deal
spdlog::debug("Got fs_event_event on file {}", socket_path.string());
if (!reading) {
reading = true;
log_file_reader->read(static_cast<int64_t>(file_offset),
MAX_READ_SIZE);
}
});
spdlog::debug("Watching file at {}", socket_path.string());
log_file_watcher->start(socket_path.string());

spdlog::debug("Reading from file at {}", socket_path.string());
log_file_reader->open(socket_path.string(),
uvw::file_req::file_open_flags::RDONLY, 0644);
#endif
#endif // POSIX hack

spdlog::debug("running loop in another thread");
loop->run();
spdlog::debug("running loop in another thread");
loop->run();

// TODO: is this line needed??
// If it is, should we use RAII to handle exceptions?
loop->walk([](auto &&h) {
spdlog::debug("Closing handle");
h.close();
});
});
}

void stop_thread() {
stop = true;
if (message != nullptr) {
message->send();
}
stop = true;
if (message != nullptr) {
message->send();
}

logger_thread.get();
logger_thread.get();
}

std::shared_future<void> logger_thread;
Expand All @@ -147,35 +182,35 @@ class IRLoggerToSpd::impl {
std::shared_ptr<uvw::async_handle> message;
/** If `true`, stop the logger thread when a @p message is received */
std::atomic<bool> stop = false;
};

/**
* @see IRLoggerToSpd::IRLoggerToSpd() documentation.
*/
static std::filesystem::path default_socket_path() {
const char *xdg_runtime_dir_ptr = std::getenv("XDG_RUNTIME_DIR");
if (xdg_runtime_dir_ptr) {
auto xdg_runtime_dir =
std::filesystem::path(xdg_runtime_dir_ptr) / "nqm-irimager";
try {
std::filesystem::create_directory(xdg_runtime_dir);
return xdg_runtime_dir / "irlogger.fifo";
} catch (const std::filesystem::filesystem_error &e) {
// on no_such_file_or_directory error, just use temp_directory_path()
if (e.code() != std::errc::no_such_file_or_directory) {
throw e;
};

/**
* @see IRLoggerToSpd::IRLoggerToSpd() documentation.
*/
static std::filesystem::path default_socket_path() {
const char *xdg_runtime_dir_ptr = std::getenv("XDG_RUNTIME_DIR");
if (xdg_runtime_dir_ptr) {
auto xdg_runtime_dir =
std::filesystem::path(xdg_runtime_dir_ptr) / "nqm-irimager";
try {
std::filesystem::create_directory(xdg_runtime_dir);
return xdg_runtime_dir / "irlogger.fifo";
} catch (const std::filesystem::filesystem_error &e) {
// on no_such_file_or_directory error, just use temp_directory_path()
if (e.code() != std::errc::no_such_file_or_directory) {
throw e;
}
}
}
}

auto temp_dir = std::filesystem::temp_directory_path() / "nqm-irimager";
std::filesystem::create_directory(temp_dir);
return temp_dir / "irlogger.fifo";
}
auto temp_dir = std::filesystem::temp_directory_path() / "nqm-irimager";
std::filesystem::create_directory(temp_dir);
return temp_dir / "irlogger.fifo";
}

IRLoggerToSpd::IRLoggerToSpd() : IRLoggerToSpd(default_socket_path()) {}
IRLoggerToSpd::IRLoggerToSpd() : IRLoggerToSpd(default_socket_path()) {}

IRLoggerToSpd::IRLoggerToSpd(const std::filesystem::path &socket_path)
: pImpl{std::make_unique<IRLoggerToSpd::impl>(socket_path)} {}
IRLoggerToSpd::IRLoggerToSpd(const std::filesystem::path &socket_path)
: pImpl{std::make_unique<IRLoggerToSpd::impl>(socket_path)} {}

IRLoggerToSpd::~IRLoggerToSpd() {}
IRLoggerToSpd::~IRLoggerToSpd() {}

0 comments on commit e4bfd2d

Please sign in to comment.