From fcad79f74b1bb4199e8df6e25a9b8492bd076871 Mon Sep 17 00:00:00 2001 From: qgarnier Date: Thu, 25 Feb 2021 11:46:49 +0100 Subject: [PATCH] enh(core): optimize zmq communication (MON-6761) (#110) --- gorgone/gorgone/class/clientzmq.pm | 7 +- gorgone/gorgone/class/core.pm | 2 - .../centreon/anomalydetection/class.pm | 3 +- .../modules/centreon/autodiscovery/class.pm | 5 +- .../gorgone/modules/centreon/engine/class.pm | 7 +- .../gorgone/modules/centreon/judge/class.pm | 5 +- .../modules/centreon/legacycmd/class.pm | 3 +- .../gorgone/modules/centreon/nodes/class.pm | 5 +- .../modules/centreon/statistics/class.pm | 5 +- gorgone/gorgone/modules/core/action/class.pm | 7 +- gorgone/gorgone/modules/core/cron/class.pm | 5 +- .../gorgone/modules/core/dbcleaner/class.pm | 5 +- .../gorgone/modules/core/httpserver/class.pm | 3 +- .../gorgone/modules/core/pipeline/class.pm | 5 +- gorgone/gorgone/modules/core/proxy/class.pm | 2 +- gorgone/gorgone/modules/core/pull/hooks.pm | 3 +- .../gorgone/modules/core/register/class.pm | 5 +- .../gorgone/modules/plugins/newtest/class.pm | 5 +- gorgone/gorgone/modules/plugins/scom/class.pm | 5 +- gorgone/gorgone/standard/api.pm | 520 +++++++++--------- gorgone/gorgone/standard/library.pm | 14 +- 21 files changed, 295 insertions(+), 326 deletions(-) diff --git a/gorgone/gorgone/class/clientzmq.pm b/gorgone/gorgone/class/clientzmq.pm index c5b427bb262..5b5ed7b2357 100644 --- a/gorgone/gorgone/class/clientzmq.pm +++ b/gorgone/gorgone/class/clientzmq.pm @@ -230,7 +230,8 @@ sub event { $connectors->{$options{identity}}->{ping_time} = time(); while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{$options{identity}}); - + last if (!defined($message)); + # in progress if ($connectors->{$options{identity}}->{handshake} == 0) { $connectors->{$options{identity}}->{handshake} = 1; @@ -272,9 +273,7 @@ sub event { if (defined($callbacks->{$options{identity}})) { $callbacks->{$options{identity}}->(identity => $options{identity}, data => $data); } - } - - last unless (gorgone::standard::library::zmq_still_read(socket => $sockets->{$options{identity}})); + } } } diff --git a/gorgone/gorgone/class/core.pm b/gorgone/gorgone/class/core.pm index dc790f3fea5..d9992533e1b 100644 --- a/gorgone/gorgone/class/core.pm +++ b/gorgone/gorgone/class/core.pm @@ -564,7 +564,6 @@ sub router_internal_event { code => $code, token => $token ); - last unless (gorgone::standard::library::zmq_still_read(socket => $gorgone->{internal_socket})); } } @@ -713,7 +712,6 @@ sub router_external_event { data => $response ); } - last unless (gorgone::standard::library::zmq_still_read(socket => $gorgone->{external_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/anomalydetection/class.pm b/gorgone/gorgone/modules/centreon/anomalydetection/class.pm index 0d9590fc335..e20a68f6d9c 100644 --- a/gorgone/gorgone/modules/centreon/anomalydetection/class.pm +++ b/gorgone/gorgone/modules/centreon/anomalydetection/class.pm @@ -626,6 +626,7 @@ sub action_saasregister { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); + last if (!defined($message)); $connector->{logger}->writeLogDebug("[anomalydetection] Event: $message"); if ($message =~ /^\[(.*?)\]/) { @@ -636,8 +637,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/autodiscovery/class.pm b/gorgone/gorgone/modules/centreon/autodiscovery/class.pm index b6fd9afd5ef..19f3cbdb352 100644 --- a/gorgone/gorgone/modules/centreon/autodiscovery/class.pm +++ b/gorgone/gorgone/modules/centreon/autodiscovery/class.pm @@ -1060,7 +1060,8 @@ sub is_hdisco_synced { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[autodiscovery] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -1070,8 +1071,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/engine/class.pm b/gorgone/gorgone/modules/centreon/engine/class.pm index a7e73d49ec1..cce95f6eaaf 100644 --- a/gorgone/gorgone/modules/centreon/engine/class.pm +++ b/gorgone/gorgone/modules/centreon/engine/class.pm @@ -258,14 +258,13 @@ sub create_child { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[engine] Event: $message"); if ($message !~ /^\[ACK\]/) { $connector->create_child(message => $message); - } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); + } } } diff --git a/gorgone/gorgone/modules/centreon/judge/class.pm b/gorgone/gorgone/modules/centreon/judge/class.pm index 0a1aa1a74ad..7bb41971aee 100644 --- a/gorgone/gorgone/modules/centreon/judge/class.pm +++ b/gorgone/gorgone/modules/centreon/judge/class.pm @@ -387,7 +387,8 @@ sub action_judgelistener { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[judge] -class- event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -397,8 +398,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/legacycmd/class.pm b/gorgone/gorgone/modules/centreon/legacycmd/class.pm index e6b2f8afcd9..3481de517df 100644 --- a/gorgone/gorgone/modules/centreon/legacycmd/class.pm +++ b/gorgone/gorgone/modules/centreon/legacycmd/class.pm @@ -726,6 +726,7 @@ sub action_centreoncommand { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); + last if (!defined($message)); $connector->{logger}->writeLogDebug("[legacycmd] Event: $message"); if ($message =~ /^\[(.*?)\]/) { @@ -736,8 +737,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/nodes/class.pm b/gorgone/gorgone/modules/centreon/nodes/class.pm index 7f004b682eb..141f94ba318 100644 --- a/gorgone/gorgone/modules/centreon/nodes/class.pm +++ b/gorgone/gorgone/modules/centreon/nodes/class.pm @@ -211,7 +211,8 @@ sub action_nodesresync { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[nodes] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -221,8 +222,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/centreon/statistics/class.pm b/gorgone/gorgone/modules/centreon/statistics/class.pm index 0f40f934da9..4ed00fcc13c 100644 --- a/gorgone/gorgone/modules/centreon/statistics/class.pm +++ b/gorgone/gorgone/modules/centreon/statistics/class.pm @@ -585,7 +585,8 @@ sub rrd_update { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[statistics] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -595,8 +596,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/core/action/class.pm b/gorgone/gorgone/modules/core/action/class.pm index 75e4351216d..910e47f5c13 100644 --- a/gorgone/gorgone/modules/core/action/class.pm +++ b/gorgone/gorgone/modules/core/action/class.pm @@ -398,8 +398,9 @@ sub create_child { sub event { while (1) { - my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[action] Event: $message"); if ($message !~ /^\[ACK\]/) { @@ -415,8 +416,6 @@ sub event { $connector->create_child(action => $action, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/core/cron/class.pm b/gorgone/gorgone/modules/core/cron/class.pm index 3e34edff989..1b2dc5786f5 100644 --- a/gorgone/gorgone/modules/core/cron/class.pm +++ b/gorgone/gorgone/modules/core/cron/class.pm @@ -373,6 +373,7 @@ sub action_deletecron { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); + last if (!defined($message)); $connector->{logger}->writeLogDebug("[cron] Event: $message"); if ($message =~ /^\[ACK\]\s+\[(.*?)\]\s+(.*)$/m) { @@ -390,9 +391,7 @@ sub event { my $data = JSON::XS->new->utf8->decode($3); $method->($connector, token => $token, data => $data); } - } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); + } } } diff --git a/gorgone/gorgone/modules/core/dbcleaner/class.pm b/gorgone/gorgone/modules/core/dbcleaner/class.pm index 04b35c1ae8c..2676db0705d 100644 --- a/gorgone/gorgone/modules/core/dbcleaner/class.pm +++ b/gorgone/gorgone/modules/core/dbcleaner/class.pm @@ -153,7 +153,8 @@ sub action_dbclean { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[dbcleaner] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -163,8 +164,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/core/httpserver/class.pm b/gorgone/gorgone/modules/core/httpserver/class.pm index be7fb31e051..47598877c65 100644 --- a/gorgone/gorgone/modules/core/httpserver/class.pm +++ b/gorgone/gorgone/modules/core/httpserver/class.pm @@ -107,10 +107,9 @@ sub class_handle_HUP { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); + last if (!defined($message)); $connector->{logger}->writeLogDebug("[httpserver] Event: $message"); - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/core/pipeline/class.pm b/gorgone/gorgone/modules/core/pipeline/class.pm index b2bea3a6b1c..cdfd30573f2 100644 --- a/gorgone/gorgone/modules/core/pipeline/class.pm +++ b/gorgone/gorgone/modules/core/pipeline/class.pm @@ -215,7 +215,8 @@ sub check_timeout { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[pipeline] -class- event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -225,8 +226,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/core/proxy/class.pm b/gorgone/gorgone/modules/core/proxy/class.pm index c386f15d469..883b8d402f4 100644 --- a/gorgone/gorgone/modules/core/proxy/class.pm +++ b/gorgone/gorgone/modules/core/proxy/class.pm @@ -459,6 +459,7 @@ sub event_internal { my $socket = $options{channel} eq 'control' ? $connector->{internal_socket} : $connector->{internal_channels}->{ $options{channel} }; while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $socket); + last if (!defined($message)); proxy(message => $message, channel => $options{channel}); if ($connector->{stop} == 1 && (time() - $connector->{exit_timeout}) > $connector->{stop_time}) { @@ -468,7 +469,6 @@ sub event_internal { defined($connector->{clients}->{ $options{channel} }) && ($connector->{clients}->{ $options{channel} }->{com_read_internal} == 0 || $connector->{clients}->{ $options{channel} }->{delete} == 1) ); - last unless (gorgone::standard::library::zmq_still_read(socket => $socket)); } } diff --git a/gorgone/gorgone/modules/core/pull/hooks.pm b/gorgone/gorgone/modules/core/pull/hooks.pm index 8fcaca98217..389544f0944 100644 --- a/gorgone/gorgone/modules/core/pull/hooks.pm +++ b/gorgone/gorgone/modules/core/pull/hooks.pm @@ -168,12 +168,13 @@ sub transmit_back { sub from_router { while (1) { my $message = transmit_back(message => gorgone::standard::library::zmq_dealer_read_message(socket => $socket_to_internal)); + last if (!defined($message)); + # Only send back SETLOGS and PONG if (defined($message)) { $logger->writeLogDebug("[pull] Read message from internal: $message"); $client->send_message(message => $message); } - last unless (gorgone::standard::library::zmq_still_read(socket => $socket_to_internal)); } } diff --git a/gorgone/gorgone/modules/core/register/class.pm b/gorgone/gorgone/modules/core/register/class.pm index 2e438f193d1..e5ec3207a2c 100644 --- a/gorgone/gorgone/modules/core/register/class.pm +++ b/gorgone/gorgone/modules/core/register/class.pm @@ -140,7 +140,8 @@ sub action_registerresync { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[register] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -150,8 +151,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/plugins/newtest/class.pm b/gorgone/gorgone/modules/plugins/newtest/class.pm index c68afeb446f..85a91cda2d6 100644 --- a/gorgone/gorgone/modules/plugins/newtest/class.pm +++ b/gorgone/gorgone/modules/plugins/newtest/class.pm @@ -590,7 +590,8 @@ sub action_newtestresync { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("gorgone-newtest: class: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -600,8 +601,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/modules/plugins/scom/class.pm b/gorgone/gorgone/modules/plugins/scom/class.pm index 1a3225847e9..bbeddec737e 100644 --- a/gorgone/gorgone/modules/plugins/scom/class.pm +++ b/gorgone/gorgone/modules/plugins/scom/class.pm @@ -478,7 +478,8 @@ sub action_scomresync { sub event { while (1) { my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $connector->{internal_socket}); - + last if (!defined($message)); + $connector->{logger}->writeLogDebug("[scom] Event: $message"); if ($message =~ /^\[(.*?)\]/) { if ((my $method = $connector->can('action_' . lc($1)))) { @@ -488,8 +489,6 @@ sub event { $method->($connector, token => $token, data => $data); } } - - last unless (gorgone::standard::library::zmq_still_read(socket => $connector->{internal_socket})); } } diff --git a/gorgone/gorgone/standard/api.pm b/gorgone/gorgone/standard/api.pm index 8d3a0dc2880..d5508302067 100644 --- a/gorgone/gorgone/standard/api.pm +++ b/gorgone/gorgone/standard/api.pm @@ -1,267 +1,253 @@ -# -# Copyright 2019 Centreon (http://www.centreon.com/) -# -# Centreon is a full-fledged industry-strength solution that meets -# the needs in IT infrastructure and application monitoring for -# service performance. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -package gorgone::standard::api; - -use strict; -use warnings; -use gorgone::standard::library; -use ZMQ::LibZMQ4; -use ZMQ::Constants qw(:all); -use Time::HiRes; -use JSON::XS; - -my $socket; -my $result; - -sub root { - my (%options) = @_; - - $options{logger}->writeLogInfo("[api] Requesting '" . $options{uri} . "' [" . $options{method} . "]"); - - my $response; - if ($options{method} eq 'GET' && $options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?log\/(.*)$/) { - $response = get_log( - socket => $options{socket}, - target => $2, - token => $3, - sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef, - parameters => $options{parameters} - ); - } elsif ($options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?internal\/(\w+)\/?([\w\/]*?)$/ - && defined($options{api_endpoints}->{$options{method} . '_/internal/' . $3})) { - my @variables = split(/\//, $4); - $response = call_internal( - socket => $options{socket}, - action => $options{api_endpoints}->{$options{method} . '_/internal/' . $3}, - target => $2, - data => { - content => $options{content}, - parameters => $options{parameters}, - variables => \@variables, - }, - log_wait => (defined($options{parameters}->{log_wait})) ? $options{parameters}->{log_wait} : undef, - sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef - ); - } elsif ($options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?(\w+)\/(\w+)\/(\w+)\/?([\w\/]*?)$/ - && defined($options{api_endpoints}->{$options{method} . '_/' . $3 . '/' . $4 . '/' . $5})) { - my @variables = split(/\//, $6); - $response = call_action( - socket => $options{socket}, - action => $options{api_endpoints}->{$options{method} . '_/' . $3 . '/' . $4 . '/' . $5}, - target => $2, - data => { - content => $options{content}, - parameters => $options{parameters}, - variables => \@variables, - }, - log_wait => (defined($options{parameters}->{log_wait})) ? $options{parameters}->{log_wait} : undef, - sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef, - ); - } else { - $response = '{"error":"method_unknown","message":"Method not implemented"}'; - } - - return $response; -} - -sub call_action { - my (%options) = @_; - - gorgone::standard::library::zmq_send_message( - socket => $options{socket}, - action => $options{action}, - target => $options{target}, - data => $options{data}, - json_encode => 1 - ); - - $socket = $options{socket}; - my $poll = [ - { - socket => $options{socket}, - events => ZMQ_POLLIN, - callback => \&event, - } - ]; - - my $rev = zmq_poll($poll, 5000); - - my $response = '{"error":"no_token","message":"Cannot retrieve token from ack"}'; - if (defined($result->{token}) && $result->{token} ne '') { - if (defined($options{log_wait}) && $options{log_wait} ne '') { - Time::HiRes::usleep($options{log_wait}); - $response = get_log( - socket => $options{socket}, - target => $options{target}, - token => $result->{token}, - sync_wait => $options{sync_wait}, - parameters => $options{data}->{parameters} - ); - } else { - $response = '{"token":"' . $result->{token} . '"}'; - } - } - - return $response; -} - -sub call_internal { - my (%options) = @_; - - $socket = $options{socket}; - my $poll = [ - { - socket => $options{socket}, - events => ZMQ_POLLIN, - callback => \&event, - } - ]; - - if (defined($options{target}) && $options{target} ne '') { - return call_action( - socket => $options{socket}, - target => $options{target}, - action => $options{action}, - data => $options{data}, - json_encode => 1, - log_wait => $options{log_wait}, - sync_wait => $options{sync_wait} - ); - } - - gorgone::standard::library::zmq_send_message( - socket => $options{socket}, - action => $options{action}, - data => $options{data}, - json_encode => 1 - ); - - my $rev = zmq_poll($poll, 5000); - - my $response = '{"error":"no_result", "message":"No result found for action \'' . $options{action} . '\'"}'; - if (defined($result->{data})) { - my $content; - eval { - $content = JSON::XS->new->utf8->decode($result->{data}); - }; - if ($@) { - $response = '{"error":"decode_error","message":"Cannot decode response"}'; - } else { - if (defined($content->{data})) { - eval { - $response = JSON::XS->new->utf8->encode($content->{data}); - }; - if ($@) { - $response = '{"error":"encode_error","message":"Cannot encode response"}'; - } - } else { - $response = ''; - } - } - } - - return $response; -} - -sub get_log { - my (%options) = @_; - - $socket = $options{socket}; - my $poll = [ - { - socket => $options{socket}, - events => ZMQ_POLLIN, - callback => \&event, - } - ]; - - if (defined($options{target}) && $options{target} ne '') { - gorgone::standard::library::zmq_send_message( - socket => $options{socket}, - target => $options{target}, - action => 'GETLOG', - json_encode => 1 - ); - - my $sync_wait = (defined($options{sync_wait}) && $options{sync_wait} ne '') ? $options{sync_wait} : '10000'; - Time::HiRes::usleep($sync_wait); - - my $rev = zmq_poll($poll, 5000); - } - - gorgone::standard::library::zmq_send_message( - socket => $options{socket}, - action => 'GETLOG', - data => { - token => $options{token}, - %{$options{parameters}} - }, - json_encode => 1 - ); - - my $rev = zmq_poll($poll, 5000); - - my $response = '{"error":"no_log","message":"No log found for token","data":[],"token":"' . $options{token} . '"}'; - if (defined($result->{data})) { - my $content; - eval { - $content = JSON::XS->new->utf8->decode($result->{data}); - }; - if ($@) { - $response = '{"error":"decode_error","message":"Cannot decode response"}'; - } elsif (defined($content->{data}->{result}) && scalar(@{$content->{data}->{result}}) > 0) { - eval { - $response = JSON::XS->new->utf8->encode( - { - message => "Logs found", - token => $options{token}, - data => $content->{data}->{result} - } - ); - }; - if ($@) { - $response = '{"error":"encode_error","message":"Cannot encode response"}'; - } - } - } - - return $response; -} - -sub event { - while (1) { - my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $socket); - - $result = {}; - if ($message =~ /^\[(.*?)\]\s+\[(.*?)\]\s+\[.*?\]\s+(.*)$/m || - $message =~ /^\[(.*?)\]\s+\[(.*?)\]\s+(.*)$/m) { - $result = { - action => $1, - token => $2, - data => $3, - }; - } - - last unless (gorgone::standard::library::zmq_still_read(socket => $socket)); - } -} - -1; +# +# Copyright 2019 Centreon (http://www.centreon.com/) +# +# Centreon is a full-fledged industry-strength solution that meets +# the needs in IT infrastructure and application monitoring for +# service performance. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +package gorgone::standard::api; + +use strict; +use warnings; +use gorgone::standard::library; +use ZMQ::LibZMQ4; +use ZMQ::Constants qw(:all); +use Time::HiRes; +use JSON::XS; + +my $socket; +my $results; +my $action_token; + +sub root { + my (%options) = @_; + + $options{logger}->writeLogInfo("[api] Requesting '" . $options{uri} . "' [" . $options{method} . "]"); + + $action_token = undef; + $socket = $options{socket}; + $results = {}; + + my $response; + if ($options{method} eq 'GET' && $options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?log\/(.*)$/) { + $response = get_log( + target => $2, + token => $3, + sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef, + parameters => $options{parameters} + ); + } elsif ($options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?internal\/(\w+)\/?([\w\/]*?)$/ + && defined($options{api_endpoints}->{$options{method} . '_/internal/' . $3})) { + my @variables = split(/\//, $4); + $response = call_internal( + action => $options{api_endpoints}->{$options{method} . '_/internal/' . $3}, + target => $2, + data => { + content => $options{content}, + parameters => $options{parameters}, + variables => \@variables + }, + log_wait => (defined($options{parameters}->{log_wait})) ? $options{parameters}->{log_wait} : undef, + sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef + ); + } elsif ($options{uri} =~ /^\/api\/(nodes\/(\w*)\/)?(\w+)\/(\w+)\/(\w+)\/?([\w\/]*?)$/ + && defined($options{api_endpoints}->{$options{method} . '_/' . $3 . '/' . $4 . '/' . $5})) { + my @variables = split(/\//, $6); + $response = call_action( + action => $options{api_endpoints}->{$options{method} . '_/' . $3 . '/' . $4 . '/' . $5}, + target => $2, + data => { + content => $options{content}, + parameters => $options{parameters}, + variables => \@variables + }, + log_wait => (defined($options{parameters}->{log_wait})) ? $options{parameters}->{log_wait} : undef, + sync_wait => (defined($options{parameters}->{sync_wait})) ? $options{parameters}->{sync_wait} : undef + ); + } else { + $response = '{"error":"method_unknown","message":"Method not implemented"}'; + } + + return $response; +} + +sub call_action { + my (%options) = @_; + + $action_token = gorgone::standard::library::generate_token() if (!defined($options{token})); + + gorgone::standard::library::zmq_send_message( + socket => $socket, + action => $options{action}, + target => $options{target}, + token => $action_token, + data => $options{data}, + json_encode => 1 + ); + + my $response = '{"token":"' . $action_token . '"}'; + if (defined($options{log_wait}) && $options{log_wait} ne '') { + Time::HiRes::usleep($options{log_wait}); + $response = get_log( + target => $options{target}, + token => $action_token, + sync_wait => $options{sync_wait}, + parameters => $options{data}->{parameters} + ); + } + + return $response; +} + +sub call_internal { + my (%options) = @_; + + my $poll = [ + { + socket => $socket, + events => ZMQ_POLLIN, + callback => \&event + } + ]; + + $action_token = gorgone::standard::library::generate_token(); + if (defined($options{target}) && $options{target} ne '') { + return call_action( + target => $options{target}, + action => $options{action}, + token => $action_token, + data => $options{data}, + json_encode => 1, + log_wait => $options{log_wait}, + sync_wait => $options{sync_wait} + ); + } + + gorgone::standard::library::zmq_send_message( + socket => $socket, + action => $options{action}, + token => $action_token, + data => $options{data}, + json_encode => 1 + ); + + my $rev = zmq_poll($poll, 5000); + + my $response = '{"error":"no_result", "message":"No result found for action \'' . $options{action} . '\'"}'; + if (defined($results->{$action_token}->{data})) { + my $content; + eval { + $content = JSON::XS->new->utf8->decode($results->{$action_token}->{data}); + }; + if ($@) { + $response = '{"error":"decode_error","message":"Cannot decode response"}'; + } else { + if (defined($content->{data})) { + eval { + $response = JSON::XS->new->utf8->encode($content->{data}); + }; + if ($@) { + $response = '{"error":"encode_error","message":"Cannot encode response"}'; + } + } else { + $response = ''; + } + } + } + + return $response; +} + +sub get_log { + my (%options) = @_; + + my $poll = [ + { + socket => $socket, + events => ZMQ_POLLIN, + callback => \&event + } + ]; + + if (defined($options{target}) && $options{target} ne '') { + gorgone::standard::library::zmq_send_message( + socket => $socket, + target => $options{target}, + action => 'GETLOG', + json_encode => 1 + ); + + my $sync_wait = (defined($options{sync_wait}) && $options{sync_wait} ne '') ? $options{sync_wait} : 10000; + Time::HiRes::usleep($sync_wait); + } + + gorgone::standard::library::zmq_send_message( + socket => $socket, + action => 'GETLOG', + token => $options{token}, + data => { + token => $options{token}, + %{$options{parameters}} + }, + json_encode => 1 + ); + + my $rev = zmq_poll($poll, 5000); + + my $response = '{"error":"no_log","message":"No log found for token","data":[],"token":"' . $options{token} . '"}'; + if (defined($results->{ $options{token} }) && defined($results->{ $options{token} }->{data})) { + my $content; + eval { + $content = JSON::XS->new->utf8->decode($results->{ $options{token} }->{data}); + }; + if ($@) { + $response = '{"error":"decode_error","message":"Cannot decode response"}'; + } elsif (defined($content->{data}->{result}) && scalar(@{$content->{data}->{result}}) > 0) { + eval { + $response = JSON::XS->new->utf8->encode( + { + message => "Logs found", + token => $options{token}, + data => $content->{data}->{result} + } + ); + }; + if ($@) { + $response = '{"error":"encode_error","message":"Cannot encode response"}'; + } + } + } + + return $response; +} + +sub event { + while (1) { + my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $socket); + last if (!defined($message)); + + if ($message =~ /^\[(.*?)\]\s+\[(.*?)\]\s+\[.*?\]\s+(.*)$/m || + $message =~ /^\[(.*?)\]\s+\[(.*?)\]\s+(.*)$/m) { + $results->{$2} = { + action => $1, + token => $2, + data => $3 + }; + } + } +} + +1; diff --git a/gorgone/gorgone/standard/library.pm b/gorgone/gorgone/standard/library.pm index a1548dfb504..c78ddada054 100644 --- a/gorgone/gorgone/standard/library.pm +++ b/gorgone/gorgone/standard/library.pm @@ -830,10 +830,10 @@ sub zmq_send_message { sub zmq_dealer_read_message { my (%options) = @_; - # Process all parts of the message - my $message = zmq_recvmsg($options{socket}); - my $data = zmq_msg_data($message); + my $message = zmq_recvmsg($options{socket}, ZMQ_DONTWAIT); + return undef if (!defined($message)); + my $data = zmq_msg_data($message); return $data; } @@ -841,9 +841,9 @@ sub zmq_read_message { my (%options) = @_; # Process all parts of the message - my $message = zmq_recvmsg($options{socket}); + my $message = zmq_recvmsg($options{socket}, ZMQ_DONTWAIT); if (!defined($message)) { - $options{logger}->writeLogError("[core] zmq_recvmsg error: $!"); + $options{logger}->writeLogDebug("[core] zmq_recvmsg error: $!"); return undef; } my $identity = zmq_msg_data($message); @@ -852,9 +852,9 @@ sub zmq_read_message { $options{logger}->writeLogError("[core] unknown identity: $identity"); return undef; } - $message = zmq_recvmsg($options{socket}); + $message = zmq_recvmsg($options{socket}, ZMQ_DONTWAIT); if (!defined($message)) { - $options{logger}->writeLogError("[core] zmq_recvmsg error: $!"); + $options{logger}->writeLogDebug("[core] zmq_recvmsg error: $!"); return undef; } my $data = zmq_msg_data($message);