Skip to content

Commit

Permalink
enh(core): optimize zmq communication (MON-6761) (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
garnier-quentin authored and Evan-Adam committed Jul 16, 2024
1 parent 83dc3cf commit 16cec09
Show file tree
Hide file tree
Showing 21 changed files with 295 additions and 326 deletions.
7 changes: 3 additions & 4 deletions gorgone/gorgone/class/clientzmq.pm
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ sub event {
$connectors->{$options{identity}}->{ping_time} = time();
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{$options{identity}});

last if (!defined($message));

# in progress
if ($connectors->{$options{identity}}->{handshake} == 0) {
$connectors->{$options{identity}}->{handshake} = 1;
Expand Down Expand Up @@ -272,9 +273,7 @@ sub event {
if (defined($callbacks->{$options{identity}})) {
$callbacks->{$options{identity}}->(identity => $options{identity}, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $sockets->{$options{identity}}));
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions gorgone/gorgone/class/core.pm
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,6 @@ sub router_internal_event {
code => $code,
token => $token
);
last unless (gorgone::standard::library::zmq_still_read(socket => $gorgone->{internal_socket}));
}
}

Expand Down Expand Up @@ -713,7 +712,6 @@ sub router_external_event {
data => $response
);
}
last unless (gorgone::standard::library::zmq_still_read(socket => $gorgone->{external_socket}));
}
}

Expand Down
3 changes: 1 addition & 2 deletions gorgone/gorgone/modules/centreon/anomalydetection/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ sub action_saasregister {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});
last if (!defined($message));

$connector->{logger}->writeLogDebug("[anomalydetection] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
Expand All @@ -636,8 +637,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/centreon/autodiscovery/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,8 @@ sub is_hdisco_synced {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[autodiscovery] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -1070,8 +1071,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
7 changes: 3 additions & 4 deletions gorgone/gorgone/modules/centreon/engine/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ sub create_child {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[engine] Event: $message");

if ($message !~ /^\[ACK\]/) {
$connector->create_child(message => $message);
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/centreon/judge/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ sub action_judgelistener {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[judge] -class- event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -397,8 +398,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
3 changes: 1 addition & 2 deletions gorgone/gorgone/modules/centreon/legacycmd/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ sub action_centreoncommand {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});
last if (!defined($message));

$connector->{logger}->writeLogDebug("[legacycmd] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
Expand All @@ -736,8 +737,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/centreon/nodes/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ sub action_nodesresync {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[nodes] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -221,8 +222,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/centreon/statistics/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ sub rrd_update {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[statistics] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -595,8 +596,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
7 changes: 3 additions & 4 deletions gorgone/gorgone/modules/core/action/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ sub create_child {

sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});
last if (!defined($message));

$connector->{logger}->writeLogDebug("[action] Event: $message");

if ($message !~ /^\[ACK\]/) {
Expand All @@ -415,8 +416,6 @@ sub event {
$connector->create_child(action => $action, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/core/cron/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ sub action_deletecron {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});
last if (!defined($message));

$connector->{logger}->writeLogDebug("[cron] Event: $message");
if ($message =~ /^\[ACK\]\s+\[(.*?)\]\s+(.*)$/m) {
Expand All @@ -390,9 +391,7 @@ sub event {
my $data = JSON::XS->new->utf8->decode($3);
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/core/dbcleaner/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ sub action_dbclean {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[dbcleaner] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -163,8 +164,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
3 changes: 1 addition & 2 deletions gorgone/gorgone/modules/core/httpserver/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ sub class_handle_HUP {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});
last if (!defined($message));

$connector->{logger}->writeLogDebug("[httpserver] Event: $message");

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/core/pipeline/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ sub check_timeout {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[pipeline] -class- event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -225,8 +226,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
2 changes: 1 addition & 1 deletion gorgone/gorgone/modules/core/proxy/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ sub event_internal {
my $socket = $options{channel} eq 'control' ? $connector->{internal_socket} : $connector->{internal_channels}->{ $options{channel} };
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $socket);
last if (!defined($message));

proxy(message => $message, channel => $options{channel});
if ($connector->{stop} == 1 && (time() - $connector->{exit_timeout}) > $connector->{stop_time}) {
Expand All @@ -468,7 +469,6 @@ sub event_internal {
defined($connector->{clients}->{ $options{channel} }) &&
($connector->{clients}->{ $options{channel} }->{com_read_internal} == 0 || $connector->{clients}->{ $options{channel} }->{delete} == 1)
);
last unless (gorgone::standard::library::zmq_still_read(socket => $socket));
}
}

Expand Down
3 changes: 2 additions & 1 deletion gorgone/gorgone/modules/core/pull/hooks.pm
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ sub transmit_back {
sub from_router {
while (1) {
my $message = transmit_back(message => gorgone::standard::library::zmq_dealer_read_message(socket => $socket_to_internal));
last if (!defined($message));

# Only send back SETLOGS and PONG
if (defined($message)) {
$logger->writeLogDebug("[pull] Read message from internal: $message");
$client->send_message(message => $message);
}
last unless (gorgone::standard::library::zmq_still_read(socket => $socket_to_internal));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/core/register/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ sub action_registerresync {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[register] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -150,8 +151,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/plugins/newtest/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ sub action_newtestresync {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("gorgone-newtest: class: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -600,8 +601,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
5 changes: 2 additions & 3 deletions gorgone/gorgone/modules/plugins/scom/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ sub action_scomresync {
sub event {
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket});

last if (!defined($message));

$connector->{logger}->writeLogDebug("[scom] Event: $message");
if ($message =~ /^\[(.*?)\]/) {
if ((my $method = $connector->can('action_' . lc($1)))) {
Expand All @@ -488,8 +489,6 @@ sub event {
$method->($connector, token => $token, data => $data);
}
}

last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket}));
}
}

Expand Down
Loading

0 comments on commit 16cec09

Please sign in to comment.