Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/monitor poll #414

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion tests/monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TEST_CASE("monitor move assign", "[monitor]")
}
}

TEST_CASE("monitor init event count", "[monitor]")
TEST_CASE("monitor init check event count", "[monitor]")
{
common_server_client_setup s{false};
mock_monitor_t monitor;
Expand All @@ -92,6 +92,93 @@ TEST_CASE("monitor init event count", "[monitor]")
CHECK(monitor.total == expected_event_count);
}

TEST_CASE("monitor init get event count", "[monitor]")
{
common_server_client_setup s{ false };
zmq::monitor_t monitor;

const int expected_event_count = 2;
monitor.init(s.client, "inproc://foo");

int total{ 0 };
int connect_delayed{ 0 };
int connected{ 0 };

auto lbd_count_event = [&](const zmq_event_t& event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does lbd stand for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"lbd" is short for "lambda". I like to differentiate "callback" methods over classical variable in my code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I am not sure if it's necessary to use a prefix at all (if it were a function pointer, would it make a difference?), but if you feel it's good to have, please use lambda_.

switch (event.event)
{
case ZMQ_EVENT_CONNECT_DELAYED:
connect_delayed++;
total++;
break;

case ZMQ_EVENT_CONNECTED:
connected++;
total++;
break;
}
};

zmq_event_t eventMsg;
std::string address;
CHECK_FALSE(monitor.get_event(eventMsg, address, zmq::recv_flags::dontwait));
s.init();

SECTION("get_event")
{
while (total < expected_event_count)
{
if (!monitor.get_event(eventMsg, address))
continue;

lbd_count_event(eventMsg);
}

}

SECTION("poll get_event")
{
while (total < expected_event_count)
{
zmq::pollitem_t items[] = {
{ monitor.handle(), 0, ZMQ_POLLIN, 0 },
};

zmq::poll(&items[0], 1, 100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the overload taking std::chrono::milliseconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger that


if (!(items[0].revents & ZMQ_POLLIN)) {
continue;
}

CHECK(monitor.get_event(eventMsg, address));

lbd_count_event(eventMsg);
}
}

SECTION("poller_t get_event")
{
zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(monitor, zmq::event_flags::pollin));

while (total < expected_event_count)
{
std::vector<zmq::poller_event<>> events(1);
if(0 == poller.wait_all(events, std::chrono::milliseconds{ 100 }))
continue;

CHECK(zmq::event_flags::pollin == events[0].events);
CHECK(monitor.get_event(eventMsg, address));

lbd_count_event(eventMsg);
}
}

CHECK(connect_delayed == 1);
CHECK(connected == 1);
CHECK(total == expected_event_count);
}

TEST_CASE("monitor init abort", "[monitor]")
{
class mock_monitor : public mock_monitor_t
Expand Down
73 changes: 66 additions & 7 deletions zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2224,6 +2224,63 @@ class monitor_t
on_monitor_started();
}

operator void *() ZMQ_NOTHROW { return handle(); }

operator void const *() const ZMQ_NOTHROW { return handle(); }
Comment on lines +2227 to +2229
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not add these operators. socket_t and context_t have them because they are direct correspondents of the underlying C data structures. But that's not the case here, and it's not obvious that a cast to void* returns the monitor socket handle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _monitor_socket.handle(); }

ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _monitor_socket.handle(); }
Comment on lines +2231 to +2233
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment above, I'd use more specific names for these functions. We are in monitor_t, so monitor is probably redundant, but i'd use socket_handle.

Also, the const overload should be removed. There are no functions in the C zmq API that accept a const void* socket. I now they exist for other classes, but that's rather a legacy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


operator socket_ref() ZMQ_NOTHROW { return (zmq::socket_ref) _monitor_socket; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I'd not add an operator here but rather a named socket() member function. Actually, then the handle/socket_handle function is not necessary at all, since its handle can be accessed through the socket_ref.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


#if ZMQ_VERSION_MAJOR >= 4
bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the signature of this method. What about

std::optional<std::pair<zmq_event_t, std::string>> get_event(zmq::recv_flags flags = zmq::recv_flags::none)

(This requires C++17 this way, but we can also do this with the fallback to detail::trivial_optional as done for other types)

This removes the ambiguity of whether the original value of eventMsg and address are used and in what cases they are modified.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm not very familiar with std::optional and will try to implement this way

{
assert(_monitor_socket);

eventMsg.event = 0;
eventMsg.value = 0;
address = std::string();

{
message_t msg;
int rc = zmq_msg_recv(msg.handle(), _monitor_socket.handle(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this use _monitor_socket.recv?

Copy link
Author

@lp35 lp35 May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, but then it is not possible to use the same behaviour as the old check_event. Otherwise I will need to use a try catch statement

static_cast<int>(flags));

if (rc == -1)
{
if (zmq_errno() == ETERM || zmq_errno() == EAGAIN)
return false;
Comment on lines +2253 to +2254
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this also returns false on ETERM? There's no way to distinguish EAGAIN and ETERM this way, and this seems inconsistent with the behavior of zmq::socket_t::recv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check https://github.com/zeromq/cppzmq/blob/master/zmq.hpp#L2274.

I just kept same behaviour implemented in check_event(). If this is not mandatory, I can just throw an exception and not use a std::optionnal as described above, but just std::pair<>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yes, that's right. It's inconsistent one way or the other, but I think the behaviour of zmq::socket_t::recv is the appropriate one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we should implement in this case? Maybe we shall throw an exception?

else
throw error_t();
}

const char *data = msg.data<char>();
memcpy(&eventMsg.event, data, sizeof(uint16_t));
data += sizeof(uint16_t);
memcpy(&eventMsg.value, data, sizeof(int32_t));
}

message_t addrMsg;
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like above, can't this use _monitor_socket.recv?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See prev comment

Thanks for the extensions, I think these will be really useful.

Please see my comments on individual issues.

Also, please remember to reformat all commits with clang-format.

Could you add this to the contribution guidelines of the project?

Most of the travis builds are failing right now, could you give them a look and fix them, please?

I don't like to put effort on something that will be not merged. I was waiting for a green thumb before putting more work on this, as I did not asked anyone about the design of this PR ;)

Thanks for reviewing, will try to solve all of those next week. BR

static_cast<int>(flags));

if (rc == -1)
{
if (zmq_errno() == ETERM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why does this return false rather than throwing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above

return false;
else
throw error_t();
}

const char *str = addrMsg.data<char>();
address = std::string(str, str + addrMsg.size());

return true;
}
#endif

bool check_event(int timeout = 0)
{
assert(_monitor_socket);
Expand Down Expand Up @@ -2357,6 +2414,15 @@ class monitor_t
_socket = socket_ref();
}
#endif

void close() ZMQ_NOTHROW
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be done.

{
#ifdef ZMQ_EVENT_MONITOR_STOPPED
abort();
#endif
_monitor_socket = socket_t();
}

virtual void on_monitor_started() {}
virtual void on_event_connected(const zmq_event_t &event_, const char *addr_)
{
Expand Down Expand Up @@ -2461,13 +2527,6 @@ class monitor_t

socket_ref _socket;
socket_t _monitor_socket;

void close() ZMQ_NOTHROW
{
if (_socket)
zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0);
_monitor_socket.close();
}
};

#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
Expand Down