From b5ce0ada48043ff4ac5a5134dc8709a04cdf0776 Mon Sep 17 00:00:00 2001 From: Stephan Schim Date: Wed, 19 Jul 2023 16:09:27 +0200 Subject: [PATCH 1/3] active_poller and poller_t support for file descriptors --- zmq.hpp | 16 ++++++++++++++++ zmq_addon.hpp | 46 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/zmq.hpp b/zmq.hpp index 3fa484c..d67498f 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2681,6 +2681,13 @@ template class poller_t } } + void remove(fd_t fd) + { + if (0 != zmq_poller_remove_fd(poller_ptr.get(), fd)) { + throw error_t(); + } + } + void modify(zmq::socket_ref socket, event_flags events) { if (0 @@ -2690,6 +2697,15 @@ template class poller_t } } + void modify(fd_t fd, event_flags events) + { + if (0 + != zmq_poller_modify_fd(poller_ptr.get(), fd, + static_cast(events))) { + throw error_t(); + } + } + size_t wait_all(std::vector &poller_events, const std::chrono::milliseconds timeout) { diff --git a/zmq_addon.hpp b/zmq_addon.hpp index 958eec5..7157a9a 100644 --- a/zmq_addon.hpp +++ b/zmq_addon.hpp @@ -683,10 +683,12 @@ class active_poller_t void add(zmq::socket_ref socket, event_flags events, handler_type handler) { + const Ref ref{socket}; + if (!handler) - throw std::invalid_argument("null handler in active_poller_t::add"); + throw std::invalid_argument("null handler in active_poller_t::add (socket)"); auto ret = handlers.emplace( - socket, std::make_shared(std::move(handler))); + ref, std::make_shared(std::move(handler))); if (!ret.second) throw error_t(EINVAL); // already added try { @@ -695,7 +697,28 @@ class active_poller_t } catch (...) { // rollback - handlers.erase(socket); + handlers.erase(ref); + throw; + } + } + + void add(fd_t fd, event_flags events, handler_type handler) + { + const Ref ref{fd}; + + if (!handler) + throw std::invalid_argument("null handler in active_poller_t::add (fd)"); + auto ret = handlers.emplace( + ref, std::make_shared(std::move(handler))); + if (!ret.second) + throw error_t(EINVAL); // already added + try { + base_poller.add(fd, events, ret.first->second.get()); + need_rebuild = true; + } + catch (...) { + // rollback + handlers.erase(ref); throw; } } @@ -707,11 +730,23 @@ class active_poller_t need_rebuild = true; } + void remove(fd_t fd) + { + base_poller.remove(fd); + handlers.erase(fd); + need_rebuild = true; + } + void modify(zmq::socket_ref socket, event_flags events) { base_poller.modify(socket, events); } + void modify(fd_t fd, event_flags events) + { + base_poller.modify(fd, events); + } + size_t wait(std::chrono::milliseconds timeout) { if (need_rebuild) { @@ -741,7 +776,10 @@ class active_poller_t bool need_rebuild{false}; poller_t base_poller{}; - std::unordered_map> handlers{}; + + using Ref = std::variant; + std::unordered_map> handlers{}; + std::vector poller_events{}; std::vector> poller_handlers{}; }; // class active_poller_t From dd67d56eae2fbb8284403bfd911dbb308003c68f Mon Sep 17 00:00:00 2001 From: Stephan Schim Date: Wed, 19 Jul 2023 17:06:48 +0200 Subject: [PATCH 2/3] unit test for active_poller and poller_t support for file descriptors --- tests/active_poller.cpp | 84 +++++++++++++++++++++++++++++++++++++++++ zmq_addon.hpp | 1 + 2 files changed, 85 insertions(+) diff --git a/tests/active_poller.cpp b/tests/active_poller.cpp index 224b371..86c35d0 100644 --- a/tests/active_poller.cpp +++ b/tests/active_poller.cpp @@ -6,6 +6,11 @@ #include #include +#include + +#if !defined(_WIN32) + #include +#endif // !_WIN32 TEST_CASE("create destroy", "[active_poller]") { @@ -86,6 +91,85 @@ TEST_CASE("add handler", "[active_poller]") active_poller.add(socket, zmq::event_flags::pollin, no_op_handler)); } +TEST_CASE("add fd handler", "[active_poller]") +{ + int fd = 1; + zmq::active_poller_t active_poller; + CHECK_NOTHROW( + active_poller.add(fd, zmq::event_flags::pollin, no_op_handler)); +} + +TEST_CASE("remove fd handler", "[active_poller]") +{ + int fd = 1; + zmq::active_poller_t active_poller; + CHECK_NOTHROW( + active_poller.add(fd, zmq::event_flags::pollin, no_op_handler)); + CHECK_NOTHROW( + active_poller.remove(fd)); + CHECK_THROWS_ZMQ_ERROR(EINVAL, active_poller.remove(100)); +} + +#if !defined(_WIN32) +// On Windows, these functions can only be used with WinSock sockets. + +TEST_CASE("mixed socket and fd handlers", "[active_poller]") +{ + int pipefd[2]; + ::pipe(pipefd); + + zmq::context_t context; + constexpr char inprocSocketAddress[] = "inproc://mixed-handlers"; + zmq::socket_t socket_rcv{context, zmq::socket_type::pair}; + zmq::socket_t socket_snd{context, zmq::socket_type::pair}; + socket_rcv.bind(inprocSocketAddress); + socket_snd.connect(inprocSocketAddress); + + unsigned eventsFd = 0; + unsigned eventsSocket = 0; + + constexpr char messageText[] = "message"; + constexpr size_t messageSize = sizeof(messageText); + + zmq::active_poller_t active_poller; + CHECK_NOTHROW( + active_poller.add(pipefd[0], zmq::event_flags::pollin, [&](zmq::event_flags flags) { + if (flags == zmq::event_flags::pollin) + { + char buffer[256]; + CHECK(messageSize == ::read(pipefd[0], buffer, messageSize)); + CHECK(0 == std::strcmp(buffer, messageText)); + ++eventsFd; + } + })); + CHECK_NOTHROW( + active_poller.add(socket_rcv, zmq::event_flags::pollin, [&](zmq::event_flags flags) { + if (flags == zmq::event_flags::pollin) + { + zmq::message_t msg; + CHECK(socket_rcv.recv(msg, zmq::recv_flags::dontwait).has_value()); + CHECK(messageSize == msg.size()); + CHECK(0 == std::strcmp(messageText, msg.data())); + ++eventsSocket; + } + })); + + // send/rcv socket pair + zmq::message_t msg{messageText, messageSize}; + socket_snd.send(msg, zmq::send_flags::dontwait); + CHECK(1 == active_poller.wait(std::chrono::milliseconds{100})); + CHECK(0 == eventsFd); + CHECK(1 == eventsSocket); + + // send/rcv pipe + ::write(pipefd[1], messageText, messageSize); + CHECK(1 == active_poller.wait(std::chrono::milliseconds{100})); + CHECK(1 == eventsFd); + CHECK(1 == eventsSocket); +} + +#endif // !_WIN32 + TEST_CASE("add null handler fails", "[active_poller]") { zmq::context_t context; diff --git a/zmq_addon.hpp b/zmq_addon.hpp index 7157a9a..9e7ef4c 100644 --- a/zmq_addon.hpp +++ b/zmq_addon.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #endif namespace zmq From 2533a7e7a2db3cdd70a5adc4a9fbf4e305f3882a Mon Sep 17 00:00:00 2001 From: Stephan Schim Date: Tue, 25 Jul 2023 19:11:42 +0200 Subject: [PATCH 3/3] socket ref or native file descriptor for poller without use of std::variant_t --- zmq_addon.hpp | 68 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/zmq_addon.hpp b/zmq_addon.hpp index 9e7ef4c..d92a56d 100644 --- a/zmq_addon.hpp +++ b/zmq_addon.hpp @@ -34,8 +34,65 @@ #include #include #include -#include -#endif + +namespace zmq +{ + // socket ref or native file descriptor for poller + class poller_ref_t + { + public: + enum RefType + { + RT_SOCKET, + RT_FD + }; + + poller_ref_t() : poller_ref_t(socket_ref{}) + {} + + poller_ref_t(const zmq::socket_ref& socket) : data{RT_SOCKET, socket, {}} + {} + + poller_ref_t(zmq::fd_t fd) : data{RT_FD, {}, fd} + {} + + size_t hash() const ZMQ_NOTHROW + { + std::size_t h = 0; + hash_combine(h, std::get<0>(data)); + hash_combine(h, std::get<1>(data)); + hash_combine(h, std::get<2>(data)); + return h; + } + + bool operator == (const poller_ref_t& o) const ZMQ_NOTHROW + { + return data == o.data; + } + + private: + template + static void hash_combine(std::size_t& seed, const T& v) ZMQ_NOTHROW + { + std::hash hasher; + seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2); + } + + std::tuple data; + + }; // class poller_ref_t + +} // namespace zmq + +// std::hash<> specialization for std::unordered_map +template <> struct std::hash +{ + size_t operator()(const zmq::poller_ref_t& ref) const ZMQ_NOTHROW + { + return ref.hash(); + } +}; +#endif // ZMQ_CPP11 namespace zmq { @@ -684,7 +741,7 @@ class active_poller_t void add(zmq::socket_ref socket, event_flags events, handler_type handler) { - const Ref ref{socket}; + const poller_ref_t ref{socket}; if (!handler) throw std::invalid_argument("null handler in active_poller_t::add (socket)"); @@ -705,7 +762,7 @@ class active_poller_t void add(fd_t fd, event_flags events, handler_type handler) { - const Ref ref{fd}; + const poller_ref_t ref{fd}; if (!handler) throw std::invalid_argument("null handler in active_poller_t::add (fd)"); @@ -778,8 +835,7 @@ class active_poller_t poller_t base_poller{}; - using Ref = std::variant; - std::unordered_map> handlers{}; + std::unordered_map> handlers{}; std::vector poller_events{}; std::vector> poller_handlers{};