Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/expose monitor socket for active poller #612

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions tests/monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class mock_monitor_t : public zmq::monitor_t
{
public:
public:

void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE
{
Expand Down Expand Up @@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]")
{
class mock_monitor : public mock_monitor_t
{
public:
public:
mock_monitor(std::function<void(void)> handle_connected) :
handle_connected{std::move(handle_connected)}
{
Expand Down Expand Up @@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]")
{
std::unique_lock<std::mutex> lock(mutex);
CHECK(cond_var.wait_for(lock, std::chrono::seconds(1),
[&done] { return done; }));
[&done] { return done; }));
}
CHECK(monitor.connected == 1);
monitor.abort();
Expand All @@ -150,3 +150,89 @@ TEST_CASE("monitor from move assigned socket", "[monitor]")
// failing
}
#endif

#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
&& !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
#include "zmq_addon.hpp"

TEST_CASE("poll monitor events using active poller", "[monitor]")
{
// define necessary class for test
class test_monitor : public zmq::monitor_t
{
public:
void init(zmq::socket_t &socket,
const char *const addr_,
int events = ZMQ_EVENT_ALL)
{
zmq::monitor_t::init(socket, addr_, events);
}

void addToPoller(zmq::active_poller_t &inActivePoller)
{
inActivePoller.add(
monitor_socket(), zmq::event_flags::pollin,
[&](zmq::event_flags ef) { process_event(static_cast<short>(ef)); });
}

void on_event_accepted(const zmq_event_t &event_,
const char *addr_) override
{
clientAccepted++;
}
void on_event_disconnected(const zmq_event_t &event,
const char *const addr) override
{
clientDisconnected++;
}

int clientAccepted = 0;
int clientDisconnected = 0;
};

//Arrange
int messageCounter = 0;
const char monitorAddress[] = "inproc://monitor-server";

auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) {
poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) {
zmq::message_t msg;
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
messageCounter++;
});
};

common_server_client_setup sockets(false);

test_monitor monitor;
monitor.init(sockets.server, monitorAddress);

zmq::active_poller_t poller;
monitor.addToPoller(poller);
addToPoller(sockets.server, poller);

sockets.init();
sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait);
CHECK(monitor.clientAccepted == 0);
CHECK(monitor.clientDisconnected == 0);

//Act
for (int i = 0; i < 10; i++) {
poller.wait(std::chrono::milliseconds(10));
dietelTiMaMi marked this conversation as resolved.
Show resolved Hide resolved
}
CHECK(monitor.clientAccepted == 1);
CHECK(monitor.clientDisconnected == 0);

sockets.client.close();

for (int i = 0; i < 10; i++) {
poller.wait(std::chrono::milliseconds(10));
}
sockets.server.close();

// Assert
CHECK(messageCounter == 1);
CHECK(monitor.clientAccepted == 1);
CHECK(monitor.clientDisconnected == 1);
}
#endif
212 changes: 110 additions & 102 deletions zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,8 +2362,6 @@ class monitor_t
{
assert(_monitor_socket);

zmq::message_t eventMsg;

zmq::pollitem_t items[] = {
{_monitor_socket.handle(), 0, ZMQ_POLLIN, 0},
};
Expand All @@ -2374,106 +2372,7 @@ class monitor_t
zmq::poll(&items[0], 1, timeout);
#endif

if (items[0].revents & ZMQ_POLLIN) {
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM)
return false;
assert(rc != -1);

} else {
return false;
}

#if ZMQ_VERSION_MAJOR >= 4
const char *data = static_cast<const char *>(eventMsg.data());
zmq_event_t msgEvent;
memcpy(&msgEvent.event, data, sizeof(uint16_t));
data += sizeof(uint16_t);
memcpy(&msgEvent.value, data, sizeof(int32_t));
zmq_event_t *event = &msgEvent;
#else
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
#endif

#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq::message_t addrMsg;
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM) {
return false;
}

assert(rc != -1);
std::string address = addrMsg.to_string();
#else
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
std::string address = event->data.connected.addr;
#endif

#ifdef ZMQ_EVENT_MONITOR_STOPPED
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
return false;
}

#endif

switch (event->event) {
case ZMQ_EVENT_CONNECTED:
on_event_connected(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_DELAYED:
on_event_connect_delayed(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_RETRIED:
on_event_connect_retried(*event, address.c_str());
break;
case ZMQ_EVENT_LISTENING:
on_event_listening(*event, address.c_str());
break;
case ZMQ_EVENT_BIND_FAILED:
on_event_bind_failed(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPTED:
on_event_accepted(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPT_FAILED:
on_event_accept_failed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSED:
on_event_closed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSE_FAILED:
on_event_close_failed(*event, address.c_str());
break;
case ZMQ_EVENT_DISCONNECTED:
on_event_disconnected(*event, address.c_str());
break;
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
on_event_handshake_failed_no_detail(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
on_event_handshake_failed_protocol(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
on_event_handshake_failed_auth(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
on_event_handshake_succeeded(*event, address.c_str());
break;
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
case ZMQ_EVENT_HANDSHAKE_FAILED:
on_event_handshake_failed(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
on_event_handshake_succeed(*event, address.c_str());
break;
#endif
default:
on_event_unknown(*event, address.c_str());
break;
}

return true;
return process_event(items[0].revents);
}

#ifdef ZMQ_EVENT_MONITOR_STOPPED
Expand Down Expand Up @@ -2583,6 +2482,115 @@ class monitor_t
(void) addr_;
}

protected:
bool process_event(short events)
{
zmq::message_t eventMsg;

if (events & ZMQ_POLLIN) {
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM)
return false;
assert(rc != -1);

} else {
return false;
}

#if ZMQ_VERSION_MAJOR >= 4
const char *data = static_cast<const char *>(eventMsg.data());
zmq_event_t msgEvent;
memcpy(&msgEvent.event, data, sizeof(uint16_t));
data += sizeof(uint16_t);
memcpy(&msgEvent.value, data, sizeof(int32_t));
zmq_event_t *event = &msgEvent;
#else
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
#endif

#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq::message_t addrMsg;
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM) {
return false;
}

assert(rc != -1);
std::string address = addrMsg.to_string();
#else
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
std::string address = event->data.connected.addr;
#endif

#ifdef ZMQ_EVENT_MONITOR_STOPPED
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
return false;
}

#endif

switch (event->event) {
case ZMQ_EVENT_CONNECTED:
on_event_connected(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_DELAYED:
on_event_connect_delayed(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_RETRIED:
on_event_connect_retried(*event, address.c_str());
break;
case ZMQ_EVENT_LISTENING:
on_event_listening(*event, address.c_str());
break;
case ZMQ_EVENT_BIND_FAILED:
on_event_bind_failed(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPTED:
on_event_accepted(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPT_FAILED:
on_event_accept_failed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSED:
on_event_closed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSE_FAILED:
on_event_close_failed(*event, address.c_str());
break;
case ZMQ_EVENT_DISCONNECTED:
on_event_disconnected(*event, address.c_str());
break;
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
on_event_handshake_failed_no_detail(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
on_event_handshake_failed_protocol(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
on_event_handshake_failed_auth(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
on_event_handshake_succeeded(*event, address.c_str());
break;
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
case ZMQ_EVENT_HANDSHAKE_FAILED:
on_event_handshake_failed(*event, address.c_str());
break;
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
on_event_handshake_succeed(*event, address.c_str());
break;
#endif
default:
on_event_unknown(*event, address.c_str());
break;
}

return true;
}

socket_ref monitor_socket() {return _monitor_socket;}

private:
monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION;
void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;
Expand Down
Loading