From 72412f0a67bcb797c0bcfd1de8bed00642914fa8 Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Fri, 27 Mar 2020 19:47:32 +0100 Subject: [PATCH 1/6] Add generic ingestor setup interface Signed-off-by: Bartosz Zurkowski --- orca/api/resources/v1/ingestor.py | 61 +++++++-------------- orca/topology/alerts/elastalert/ingestor.py | 23 ++++++++ orca/topology/alerts/elastalert/manager.py | 22 ++++---- orca/topology/alerts/falco/ingestor.py | 23 ++++++++ orca/topology/alerts/falco/manager.py | 22 ++++---- orca/topology/alerts/prometheus/ingestor.py | 27 +++++++++ orca/topology/alerts/prometheus/manager.py | 17 +++--- orca/topology/bundle.py | 10 ++++ orca/topology/ingestor.py | 24 ++++++-- 9 files changed, 151 insertions(+), 78 deletions(-) create mode 100644 orca/topology/alerts/elastalert/ingestor.py create mode 100644 orca/topology/alerts/falco/ingestor.py create mode 100644 orca/topology/alerts/prometheus/ingestor.py diff --git a/orca/api/resources/v1/ingestor.py b/orca/api/resources/v1/ingestor.py index 952f7e6..8d080e6 100644 --- a/orca/api/resources/v1/ingestor.py +++ b/orca/api/resources/v1/ingestor.py @@ -19,62 +19,39 @@ from orca import exceptions from orca.common import logger -from orca.topology.alerts.elastalert import manager as es +from orca.topology import linker +from orca.topology.alerts.elastalert import manager as elastalert from orca.topology.alerts.falco import manager as falco from orca.topology.alerts.prometheus import manager as prometheus LOG = logger.get_logger(__name__) -class Ingestor(Resource): +class IngestorResource(Resource): - """Base class for alert ingestors.""" + """Endpoint for ingesting events from the upstream.""" - def __init__(self, api, entity_handler): + def __init__(self, api, ingestor): super().__init__() - self._entity_handler = entity_handler + self._ingestor = ingestor def post(self): payload = request.json - LOG.debug("Ingested an entity: %s", json.dumps(payload)) - self._ingest(payload) - - def _ingest(self, entity): - try: - self._entity_handler.handle_event(entity) - except exceptions.OrcaError as ex: - LOG.debug("Error while processing an entity: %s", ex) - - -class Prometheus(Ingestor): - - """Prometheus ingest endpoint.""" - - def post(self): - payload = request.json - for alert in payload['alerts']: - self._ingest(alert) - - -class Falco(Ingestor): - - """Falco ingest endpoint.""" - - -class Elastalert(Ingestor): - - """Elastalert ingest endpoint.""" + self._ingestor.ingest(payload) def initialize(graph): api = Namespace('ingestor', description='Ingestor API') - api.add_resource( - Prometheus, '/prometheus', - resource_class_args=[prometheus.initialize_handler(graph)]) - api.add_resource( - Falco, '/falco', - resource_class_args=[falco.initialize_handler(graph)]) - api.add_resource( - Elastalert, '/elastalert', - resource_class_args=[es.initialize_handler(graph)]) + ingestor_modules = [prometheus, falco, elastalert] + linkers = [] + for ingestor_module in ingestor_modules: + for ingestor_bundle in ingestor_module.get_ingestors(): + for linker_module in ingestor_bundle.linkers: + linkers.append(linker_module.get(graph)) + api.add_resource(IngestorResource, "/%s" % ingestor_bundle.name.lower(), + resource_class_args=(ingestor_bundle.ingestor.get(graph),)) + dispatcher = linker.EventDispatcher() + for linker_instance in linkers: + dispatcher.add_linker(linker_instance) + graph.add_listener(dispatcher) return api diff --git a/orca/topology/alerts/elastalert/ingestor.py b/orca/topology/alerts/elastalert/ingestor.py new file mode 100644 index 0000000..8b30b4f --- /dev/null +++ b/orca/topology/alerts/elastalert/ingestor.py @@ -0,0 +1,23 @@ +# Copyright 2020 OpenRCA Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orca.topology import ingestor +from orca.topology.alerts.elastalert import extractor + + +class AlertIngestor(ingestor.Ingestor): + + @classmethod + def get(cls, graph): + return cls(graph, extractor.AlertExtractor.get()) diff --git a/orca/topology/alerts/elastalert/manager.py b/orca/topology/alerts/elastalert/manager.py index 66f69fc..ba04b2f 100644 --- a/orca/topology/alerts/elastalert/manager.py +++ b/orca/topology/alerts/elastalert/manager.py @@ -12,17 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from orca.topology import ingestor -from orca.topology.alerts.elastalert import extractor, linker +from orca.topology import bundle +from orca.topology.alerts.elastalert import linker, ingestor -def initialize_probes(graph): - return [] - - -def initialize_linkers(graph): - return [linker.AlertLinker.get(graph)] - - -def initialize_handler(graph): - return ingestor.EventHandler(graph, extractor.AlertExtractor.get()) +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='elastalert', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/alerts/falco/ingestor.py b/orca/topology/alerts/falco/ingestor.py new file mode 100644 index 0000000..6e3b61c --- /dev/null +++ b/orca/topology/alerts/falco/ingestor.py @@ -0,0 +1,23 @@ +# Copyright 2020 OpenRCA Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orca.topology import ingestor +from orca.topology.alerts.falco import extractor + + +class AlertIngestor(ingestor.Ingestor): + + @classmethod + def get(cls, graph): + return cls(graph, extractor.AlertExtractor.get()) diff --git a/orca/topology/alerts/falco/manager.py b/orca/topology/alerts/falco/manager.py index f86f8a0..c533dae 100644 --- a/orca/topology/alerts/falco/manager.py +++ b/orca/topology/alerts/falco/manager.py @@ -12,17 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from orca.topology import ingestor -from orca.topology.alerts.falco import extractor, linker +from orca.topology import bundle +from orca.topology.alerts.falco import linker, ingestor -def initialize_probes(graph): - return [] - - -def initialize_linkers(graph): - return [linker.AlertLinker.get(graph)] - - -def initialize_handler(graph): - return ingestor.EventHandler(graph, extractor.AlertExtractor.get()) +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='falco', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/alerts/prometheus/ingestor.py b/orca/topology/alerts/prometheus/ingestor.py new file mode 100644 index 0000000..e4a55a1 --- /dev/null +++ b/orca/topology/alerts/prometheus/ingestor.py @@ -0,0 +1,27 @@ +# Copyright 2020 OpenRCA Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from orca.topology import ingestor +from orca.topology.alerts.prometheus import extractor + + +class AlertIngestor(ingestor.Ingestor): + + def ingest(self, event): + for alert in event['alerts']: + super().ingest(alert) + + @classmethod + def get(cls, graph): + return cls(graph, extractor.AlertEventExtractor.get()) diff --git a/orca/topology/alerts/prometheus/manager.py b/orca/topology/alerts/prometheus/manager.py index df38f33..e0e6e05 100644 --- a/orca/topology/alerts/prometheus/manager.py +++ b/orca/topology/alerts/prometheus/manager.py @@ -12,11 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from orca.common import config -from orca.topology import bundle, ingestor -from orca.topology.alerts.prometheus import extractor, linker, probe - -CONFIG = config.CONFIG +from orca.topology import bundle +from orca.topology.alerts.prometheus import ingestor, linker, probe def get_bundles(): @@ -28,5 +25,11 @@ def get_bundles(): ] -def initialize_handler(graph): - return ingestor.EventHandler(graph, extractor.AlertEventExtractor.get()) +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='prometheus', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/bundle.py b/orca/topology/bundle.py index df3aed1..75f1a76 100644 --- a/orca/topology/bundle.py +++ b/orca/topology/bundle.py @@ -20,3 +20,13 @@ class ProbeBundle(object): def __init__(self, probe, linkers): self.probe = probe self.linkers = linkers + + +class IngestorBundle(object): + + """Value object holding ingestor runtime spec.""" + + def __init__(self, name, ingestor, linkers): + self.name = name + self.ingestor = ingestor + self.linkers = linkers diff --git a/orca/topology/ingestor.py b/orca/topology/ingestor.py index c75df3f..edf77c6 100644 --- a/orca/topology/ingestor.py +++ b/orca/topology/ingestor.py @@ -12,15 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +from orca import exceptions +from orca.common import logger -class EventHandler(object): +LOG = logger.get_logger(__name__) - """Processes entity events received by ingestor.""" + +class Ingestor(object): + + """Processes entity events received from the upstream.""" def __init__(self, graph, extractor): self._graph = graph self._extractor = extractor - def handle_event(self, entity): - node = self._extractor.extract(entity) - self._graph.add_node(node) + def ingest(self, event): + try: + self._ingest_event(event) + except exceptions.OrcaError as ex: + LOG.debug("Error while extracting an entity: %s", ex) + + def _ingest_event(self, event): + node = self._extractor.extract(event) + if self._graph.get_node(node.id): + self._graph.update_node(node) + else: + self._graph.add_node(node) From e3ab9af57d58c806ff1ac952ee06377458456dfa Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Fri, 27 Mar 2020 19:51:19 +0100 Subject: [PATCH 2/6] Rename get_bundles to get_probes Signed-off-by: Bartosz Zurkowski --- orca/topology/alerts/prometheus/manager.py | 2 +- orca/topology/infra/istio/manager.py | 2 +- orca/topology/infra/k8s/manager.py | 2 +- orca/topology/infra/kiali/manager.py | 2 +- orca/topology/manager.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/orca/topology/alerts/prometheus/manager.py b/orca/topology/alerts/prometheus/manager.py index e0e6e05..1f0fb69 100644 --- a/orca/topology/alerts/prometheus/manager.py +++ b/orca/topology/alerts/prometheus/manager.py @@ -16,7 +16,7 @@ from orca.topology.alerts.prometheus import ingestor, linker, probe -def get_bundles(): +def get_probes(): return [ bundle.ProbeBundle( probe=probe.AlertProbe, diff --git a/orca/topology/infra/istio/manager.py b/orca/topology/infra/istio/manager.py index eeeac3b..0c62043 100644 --- a/orca/topology/infra/istio/manager.py +++ b/orca/topology/infra/istio/manager.py @@ -16,7 +16,7 @@ from orca.topology import bundle -def get_bundles(): +def get_probes(): return [ bundle.ProbeBundle( probe=probe.VirtualServicePullProbe, diff --git a/orca/topology/infra/k8s/manager.py b/orca/topology/infra/k8s/manager.py index 5295e23..c318975 100644 --- a/orca/topology/infra/k8s/manager.py +++ b/orca/topology/infra/k8s/manager.py @@ -17,7 +17,7 @@ from orca.topology.infra.k8s import cluster, linker, probe -def get_bundles(): +def get_probes(): return [ bundle.ProbeBundle( probe=probe.PodPullProbe, diff --git a/orca/topology/infra/kiali/manager.py b/orca/topology/infra/kiali/manager.py index 486dea2..7774474 100644 --- a/orca/topology/infra/kiali/manager.py +++ b/orca/topology/infra/kiali/manager.py @@ -16,7 +16,7 @@ from orca.topology import bundle -def get_bundles(): +def get_probes(): return [ bundle.ProbeBundle( probe=probe.ServiceGraphProbe, diff --git a/orca/topology/manager.py b/orca/topology/manager.py index 6d11afb..7bcb7cd 100644 --- a/orca/topology/manager.py +++ b/orca/topology/manager.py @@ -34,5 +34,5 @@ def initialize(self): graph_lock = multiprocessing.Lock() probe_managers = [k8s, istio, prom, kiali] for probe_manager in probe_managers: - for probe_bundle in probe_manager.get_bundles(): + for probe_bundle in probe_manager.get_probes(): self.add(probe.ProbeRunner, workers=1, args=(probe_bundle, graph_lock)) From f44fc87dbb27052a8d2ed7c6fc0f4301161c8e79 Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Fri, 27 Mar 2020 19:55:21 +0100 Subject: [PATCH 3/6] Remove probe managers Signed-off-by: Bartosz Zurkowski --- orca/api/resources/v1/ingestor.py | 6 +- orca/topology/alerts/elastalert/__init__.py | 13 + orca/topology/alerts/elastalert/manager.py | 26 -- orca/topology/alerts/falco/__init__.py | 13 + orca/topology/alerts/falco/manager.py | 26 -- orca/topology/alerts/prometheus/__init__.py | 22 ++ orca/topology/alerts/prometheus/manager.py | 35 --- orca/topology/infra/istio/__init__.py | 51 ++++ orca/topology/infra/istio/manager.py | 64 ----- orca/topology/infra/k8s/__init__.py | 249 +++++++++++++++++++ orca/topology/infra/k8s/manager.py | 262 -------------------- orca/topology/infra/kiali/__init__.py | 12 + orca/topology/infra/kiali/manager.py | 25 -- orca/topology/manager.py | 14 +- 14 files changed, 370 insertions(+), 448 deletions(-) delete mode 100644 orca/topology/alerts/elastalert/manager.py delete mode 100644 orca/topology/alerts/falco/manager.py delete mode 100644 orca/topology/alerts/prometheus/manager.py delete mode 100644 orca/topology/infra/istio/manager.py delete mode 100644 orca/topology/infra/k8s/manager.py delete mode 100644 orca/topology/infra/kiali/manager.py diff --git a/orca/api/resources/v1/ingestor.py b/orca/api/resources/v1/ingestor.py index 8d080e6..9f95c22 100644 --- a/orca/api/resources/v1/ingestor.py +++ b/orca/api/resources/v1/ingestor.py @@ -20,9 +20,9 @@ from orca import exceptions from orca.common import logger from orca.topology import linker -from orca.topology.alerts.elastalert import manager as elastalert -from orca.topology.alerts.falco import manager as falco -from orca.topology.alerts.prometheus import manager as prometheus +from orca.topology.alerts import elastalert +from orca.topology.alerts import falco +from orca.topology.alerts import prometheus LOG = logger.get_logger(__name__) diff --git a/orca/topology/alerts/elastalert/__init__.py b/orca/topology/alerts/elastalert/__init__.py index ff6d210..ba04b2f 100644 --- a/orca/topology/alerts/elastalert/__init__.py +++ b/orca/topology/alerts/elastalert/__init__.py @@ -11,3 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology import bundle +from orca.topology.alerts.elastalert import linker, ingestor + + +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='elastalert', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/alerts/elastalert/manager.py b/orca/topology/alerts/elastalert/manager.py deleted file mode 100644 index ba04b2f..0000000 --- a/orca/topology/alerts/elastalert/manager.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology import bundle -from orca.topology.alerts.elastalert import linker, ingestor - - -def get_ingestors(): - return [ - bundle.IngestorBundle( - name='elastalert', - ingestor=ingestor.AlertIngestor, - linkers=[linker.AlertLinker] - ) - ] diff --git a/orca/topology/alerts/falco/__init__.py b/orca/topology/alerts/falco/__init__.py index ff6d210..c533dae 100644 --- a/orca/topology/alerts/falco/__init__.py +++ b/orca/topology/alerts/falco/__init__.py @@ -11,3 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology import bundle +from orca.topology.alerts.falco import linker, ingestor + + +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='falco', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/alerts/falco/manager.py b/orca/topology/alerts/falco/manager.py deleted file mode 100644 index c533dae..0000000 --- a/orca/topology/alerts/falco/manager.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology import bundle -from orca.topology.alerts.falco import linker, ingestor - - -def get_ingestors(): - return [ - bundle.IngestorBundle( - name='falco', - ingestor=ingestor.AlertIngestor, - linkers=[linker.AlertLinker] - ) - ] diff --git a/orca/topology/alerts/prometheus/__init__.py b/orca/topology/alerts/prometheus/__init__.py index ff6d210..1f0fb69 100644 --- a/orca/topology/alerts/prometheus/__init__.py +++ b/orca/topology/alerts/prometheus/__init__.py @@ -11,3 +11,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology import bundle +from orca.topology.alerts.prometheus import ingestor, linker, probe + + +def get_probes(): + return [ + bundle.ProbeBundle( + probe=probe.AlertProbe, + linkers=[linker.AlertLinker] + ) + ] + + +def get_ingestors(): + return [ + bundle.IngestorBundle( + name='prometheus', + ingestor=ingestor.AlertIngestor, + linkers=[linker.AlertLinker] + ) + ] diff --git a/orca/topology/alerts/prometheus/manager.py b/orca/topology/alerts/prometheus/manager.py deleted file mode 100644 index 1f0fb69..0000000 --- a/orca/topology/alerts/prometheus/manager.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology import bundle -from orca.topology.alerts.prometheus import ingestor, linker, probe - - -def get_probes(): - return [ - bundle.ProbeBundle( - probe=probe.AlertProbe, - linkers=[linker.AlertLinker] - ) - ] - - -def get_ingestors(): - return [ - bundle.IngestorBundle( - name='prometheus', - ingestor=ingestor.AlertIngestor, - linkers=[linker.AlertLinker] - ) - ] diff --git a/orca/topology/infra/istio/__init__.py b/orca/topology/infra/istio/__init__.py index ff6d210..0c62043 100644 --- a/orca/topology/infra/istio/__init__.py +++ b/orca/topology/infra/istio/__init__.py @@ -11,3 +11,54 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology.infra.istio import linker, probe +from orca.topology import bundle + + +def get_probes(): + return [ + bundle.ProbeBundle( + probe=probe.VirtualServicePullProbe, + linkers=[ + linker.VirtualServiceToGatewayLinker, + linker.VirtualServiceToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.VirtualServicePushProbe, + linkers=[ + linker.VirtualServiceToGatewayLinker, + linker.VirtualServiceToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DestinationRulePullProbe, + linkers=[ + linker.DestinationRuleToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DestinationRulePushProbe, + linkers=[ + linker.DestinationRuleToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.GatewayPullProbe, + linkers=[ + linker.VirtualServiceToGatewayLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.GatewayPushProbe, + linkers=[ + linker.VirtualServiceToGatewayLinker + ] + ), + ] diff --git a/orca/topology/infra/istio/manager.py b/orca/topology/infra/istio/manager.py deleted file mode 100644 index 0c62043..0000000 --- a/orca/topology/infra/istio/manager.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology.infra.istio import linker, probe -from orca.topology import bundle - - -def get_probes(): - return [ - bundle.ProbeBundle( - probe=probe.VirtualServicePullProbe, - linkers=[ - linker.VirtualServiceToGatewayLinker, - linker.VirtualServiceToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.VirtualServicePushProbe, - linkers=[ - linker.VirtualServiceToGatewayLinker, - linker.VirtualServiceToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DestinationRulePullProbe, - linkers=[ - linker.DestinationRuleToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DestinationRulePushProbe, - linkers=[ - linker.DestinationRuleToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.GatewayPullProbe, - linkers=[ - linker.VirtualServiceToGatewayLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.GatewayPushProbe, - linkers=[ - linker.VirtualServiceToGatewayLinker - ] - ), - ] diff --git a/orca/topology/infra/k8s/__init__.py b/orca/topology/infra/k8s/__init__.py index ff6d210..c318975 100644 --- a/orca/topology/infra/k8s/__init__.py +++ b/orca/topology/infra/k8s/__init__.py @@ -11,3 +11,252 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology import bundle +from orca.topology.infra.istio import linker as istio_linker +from orca.topology.infra.k8s import cluster, linker, probe + + +def get_probes(): + return [ + bundle.ProbeBundle( + probe=probe.PodPullProbe, + linkers=[ + linker.PodToServiceLinker, + linker.PodToReplicaSetLinker, + linker.PodToStatefulSetLinker, + linker.PodToDaemonSetLinker, + linker.PodToNodeLinker, + linker.ConfigMapToPodLinker, + linker.SecretToPodLinker, + linker.PersistentVolumeClaimToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.PodPushProbe, + linkers=[ + linker.PodToServiceLinker, + linker.PodToReplicaSetLinker, + linker.PodToStatefulSetLinker, + linker.PodToDaemonSetLinker, + linker.PodToNodeLinker, + linker.ConfigMapToPodLinker, + linker.SecretToPodLinker, + linker.PersistentVolumeClaimToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ServicePullProbe, + linkers=[ + linker.PodToServiceLinker, + linker.EndpointsToServiceLinker, + istio_linker.VirtualServiceToServiceLinker, + istio_linker.DestinationRuleToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ServicePushProbe, + linkers=[ + linker.PodToServiceLinker, + linker.EndpointsToServiceLinker, + istio_linker.VirtualServiceToServiceLinker, + istio_linker.DestinationRuleToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.EndpointsPullProbe, + linkers=[ + linker.EndpointsToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.EndpointsPushProbe, + linkers=[ + linker.EndpointsToServiceLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DeploymentPullProbe, + linkers=[ + linker.DeploymentToHorizontalPodAutoscalerLinker, + linker.ReplicaSetToDeploymentLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DeploymentPushProbe, + linkers=[ + linker.DeploymentToHorizontalPodAutoscalerLinker, + linker.ReplicaSetToDeploymentLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ReplicaSetPullProbe, + linkers=[ + linker.PodToReplicaSetLinker, + linker.ReplicaSetToDeploymentLinker, + linker.ReplicaSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ReplicaSetPushProbe, + linkers=[ + linker.PodToReplicaSetLinker, + linker.ReplicaSetToDeploymentLinker, + linker.ReplicaSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DaemonSetPullProbe, + linkers=[ + linker.PodToDaemonSetLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.DaemonSetPushProbe, + linkers=[ + linker.PodToDaemonSetLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.StatefulSetPullProbe, + linkers=[ + linker.PodToStatefulSetLinker, + linker.StatefulSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.StatefulSetPushProbe, + linkers=[ + linker.PodToStatefulSetLinker, + linker.StatefulSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ConfigMapPullProbe, + linkers=[ + linker.ConfigMapToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.ConfigMapPushProbe, + linkers=[ + linker.ConfigMapToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.SecretPullProbe, + linkers=[ + linker.SecretToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.SecretPushProbe, + linkers=[ + linker.SecretToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.StorageClassPullProbe, + linkers=[ + linker.PersistentVolumeToStorageClassLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.StorageClassPushProbe, + linkers=[ + linker.PersistentVolumeToStorageClassLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.PersistentVolumePullProbe, + linkers=[ + linker.PersistentVolumeToStorageClassLinker, + linker.PersistentVolumeToPersistentVolumeClaimLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.PersistentVolumePushProbe, + linkers=[ + linker.PersistentVolumeToStorageClassLinker, + linker.PersistentVolumeToPersistentVolumeClaimLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.PersistentVolumeClaimPullProbe, + linkers=[ + linker.PersistentVolumeToPersistentVolumeClaimLinker, + linker.PersistentVolumeClaimToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.PersistentVolumeClaimPushProbe, + linkers=[ + linker.PersistentVolumeToPersistentVolumeClaimLinker, + linker.PersistentVolumeClaimToPodLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.HorizontalPodAutoscalerPullProbe, + linkers=[ + linker.DeploymentToHorizontalPodAutoscalerLinker, + linker.ReplicaSetToHorizontalPodAutoscalerLinker, + linker.StatefulSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.HorizontalPodAutoscalerPushProbe, + linkers=[ + linker.DeploymentToHorizontalPodAutoscalerLinker, + linker.ReplicaSetToHorizontalPodAutoscalerLinker, + linker.StatefulSetToHorizontalPodAutoscalerLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.NodePullProbe, + linkers=[ + linker.PodToNodeLinker, + linker.NodeToClusterLinker + ] + ), + + bundle.ProbeBundle( + probe=probe.NodePushProbe, + linkers=[ + linker.PodToNodeLinker, + linker.NodeToClusterLinker + ] + ), + + bundle.ProbeBundle( + probe=cluster.ClusterProbe, + linkers=[ + linker.NodeToClusterLinker + ] + ) + ] diff --git a/orca/topology/infra/k8s/manager.py b/orca/topology/infra/k8s/manager.py deleted file mode 100644 index c318975..0000000 --- a/orca/topology/infra/k8s/manager.py +++ /dev/null @@ -1,262 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology import bundle -from orca.topology.infra.istio import linker as istio_linker -from orca.topology.infra.k8s import cluster, linker, probe - - -def get_probes(): - return [ - bundle.ProbeBundle( - probe=probe.PodPullProbe, - linkers=[ - linker.PodToServiceLinker, - linker.PodToReplicaSetLinker, - linker.PodToStatefulSetLinker, - linker.PodToDaemonSetLinker, - linker.PodToNodeLinker, - linker.ConfigMapToPodLinker, - linker.SecretToPodLinker, - linker.PersistentVolumeClaimToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.PodPushProbe, - linkers=[ - linker.PodToServiceLinker, - linker.PodToReplicaSetLinker, - linker.PodToStatefulSetLinker, - linker.PodToDaemonSetLinker, - linker.PodToNodeLinker, - linker.ConfigMapToPodLinker, - linker.SecretToPodLinker, - linker.PersistentVolumeClaimToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ServicePullProbe, - linkers=[ - linker.PodToServiceLinker, - linker.EndpointsToServiceLinker, - istio_linker.VirtualServiceToServiceLinker, - istio_linker.DestinationRuleToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ServicePushProbe, - linkers=[ - linker.PodToServiceLinker, - linker.EndpointsToServiceLinker, - istio_linker.VirtualServiceToServiceLinker, - istio_linker.DestinationRuleToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.EndpointsPullProbe, - linkers=[ - linker.EndpointsToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.EndpointsPushProbe, - linkers=[ - linker.EndpointsToServiceLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DeploymentPullProbe, - linkers=[ - linker.DeploymentToHorizontalPodAutoscalerLinker, - linker.ReplicaSetToDeploymentLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DeploymentPushProbe, - linkers=[ - linker.DeploymentToHorizontalPodAutoscalerLinker, - linker.ReplicaSetToDeploymentLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ReplicaSetPullProbe, - linkers=[ - linker.PodToReplicaSetLinker, - linker.ReplicaSetToDeploymentLinker, - linker.ReplicaSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ReplicaSetPushProbe, - linkers=[ - linker.PodToReplicaSetLinker, - linker.ReplicaSetToDeploymentLinker, - linker.ReplicaSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DaemonSetPullProbe, - linkers=[ - linker.PodToDaemonSetLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.DaemonSetPushProbe, - linkers=[ - linker.PodToDaemonSetLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.StatefulSetPullProbe, - linkers=[ - linker.PodToStatefulSetLinker, - linker.StatefulSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.StatefulSetPushProbe, - linkers=[ - linker.PodToStatefulSetLinker, - linker.StatefulSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ConfigMapPullProbe, - linkers=[ - linker.ConfigMapToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.ConfigMapPushProbe, - linkers=[ - linker.ConfigMapToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.SecretPullProbe, - linkers=[ - linker.SecretToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.SecretPushProbe, - linkers=[ - linker.SecretToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.StorageClassPullProbe, - linkers=[ - linker.PersistentVolumeToStorageClassLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.StorageClassPushProbe, - linkers=[ - linker.PersistentVolumeToStorageClassLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.PersistentVolumePullProbe, - linkers=[ - linker.PersistentVolumeToStorageClassLinker, - linker.PersistentVolumeToPersistentVolumeClaimLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.PersistentVolumePushProbe, - linkers=[ - linker.PersistentVolumeToStorageClassLinker, - linker.PersistentVolumeToPersistentVolumeClaimLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.PersistentVolumeClaimPullProbe, - linkers=[ - linker.PersistentVolumeToPersistentVolumeClaimLinker, - linker.PersistentVolumeClaimToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.PersistentVolumeClaimPushProbe, - linkers=[ - linker.PersistentVolumeToPersistentVolumeClaimLinker, - linker.PersistentVolumeClaimToPodLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.HorizontalPodAutoscalerPullProbe, - linkers=[ - linker.DeploymentToHorizontalPodAutoscalerLinker, - linker.ReplicaSetToHorizontalPodAutoscalerLinker, - linker.StatefulSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.HorizontalPodAutoscalerPushProbe, - linkers=[ - linker.DeploymentToHorizontalPodAutoscalerLinker, - linker.ReplicaSetToHorizontalPodAutoscalerLinker, - linker.StatefulSetToHorizontalPodAutoscalerLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.NodePullProbe, - linkers=[ - linker.PodToNodeLinker, - linker.NodeToClusterLinker - ] - ), - - bundle.ProbeBundle( - probe=probe.NodePushProbe, - linkers=[ - linker.PodToNodeLinker, - linker.NodeToClusterLinker - ] - ), - - bundle.ProbeBundle( - probe=cluster.ClusterProbe, - linkers=[ - linker.NodeToClusterLinker - ] - ) - ] diff --git a/orca/topology/infra/kiali/__init__.py b/orca/topology/infra/kiali/__init__.py index ff6d210..7774474 100644 --- a/orca/topology/infra/kiali/__init__.py +++ b/orca/topology/infra/kiali/__init__.py @@ -11,3 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from orca.topology.infra.kiali import probe +from orca.topology import bundle + + +def get_probes(): + return [ + bundle.ProbeBundle( + probe=probe.ServiceGraphProbe, + linkers=[] + ) + ] diff --git a/orca/topology/infra/kiali/manager.py b/orca/topology/infra/kiali/manager.py deleted file mode 100644 index 7774474..0000000 --- a/orca/topology/infra/kiali/manager.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2020 OpenRCA Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from orca.topology.infra.kiali import probe -from orca.topology import bundle - - -def get_probes(): - return [ - bundle.ProbeBundle( - probe=probe.ServiceGraphProbe, - linkers=[] - ) - ] diff --git a/orca/topology/manager.py b/orca/topology/manager.py index 7bcb7cd..0205f6a 100644 --- a/orca/topology/manager.py +++ b/orca/topology/manager.py @@ -18,10 +18,10 @@ from orca.common import config from orca.topology import probe -from orca.topology.alerts.prometheus import manager as prom -from orca.topology.infra.istio import manager as istio -from orca.topology.infra.k8s import manager as k8s -from orca.topology.infra.kiali import manager as kiali +from orca.topology.alerts import prometheus +from orca.topology.infra import istio +from orca.topology.infra import k8s +from orca.topology.infra import kiali CONFIG = config.CONFIG @@ -32,7 +32,7 @@ class Manager(cotyledon.ServiceManager): def initialize(self): graph_lock = multiprocessing.Lock() - probe_managers = [k8s, istio, prom, kiali] - for probe_manager in probe_managers: - for probe_bundle in probe_manager.get_probes(): + probe_modules = [k8s, istio, kiali, prometheus] + for probe_module in probe_modules: + for probe_bundle in probe_module.get_probes(): self.add(probe.ProbeRunner, workers=1, args=(probe_bundle, graph_lock)) From 68f0b9d7953d98b160db6dc79f65c1d3c2c264eb Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Fri, 27 Mar 2020 20:15:39 +0100 Subject: [PATCH 4/6] Refactor Prometheus probe factory method Signed-off-by: Bartosz Zurkowski --- orca/topology/alerts/prometheus/probe.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/orca/topology/alerts/prometheus/probe.py b/orca/topology/alerts/prometheus/probe.py index 1a3dd7e..8829c8d 100644 --- a/orca/topology/alerts/prometheus/probe.py +++ b/orca/topology/alerts/prometheus/probe.py @@ -15,9 +15,7 @@ from orca.common import config from orca.common.clients.prometheus import client as prometheus from orca.topology import probe, utils -from orca.topology.alerts import extractor -from orca.topology.alerts.prometheus import extractor as prom_extractor -from orca.topology.alerts.prometheus import upstream +from orca.topology.alerts.prometheus import extractor, upstream CONFIG = config.CONFIG @@ -28,11 +26,11 @@ class AlertProbe(probe.PullProbe): @classmethod def get(cls, graph): - source_mapper = extractor.SourceMapper('prometheus') - prom_client = prometheus.PrometheusClient.get(url=CONFIG.prometheus.url) + prom_client = prometheus.PrometheusClient.get( + url=CONFIG.prometheus.url) return cls( graph=graph, upstream_proxy=upstream.UpstreamProxy(prom_client), - extractor=prom_extractor.AlertExtractor(source_mapper), + extractor=extractor.AlertExtractor.get(), synchronizer=utils.NodeSynchronizer(graph), resync_period=CONFIG.prometheus.resync_period) From 3868c10a3e8d68c99cb70b6977cbebc08cf347eb Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Sat, 28 Mar 2020 10:32:57 +0100 Subject: [PATCH 5/6] Add ingestor registry Signed-off-by: Bartosz Zurkowski --- orca/api/resources/v1/ingestor.py | 36 +++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/orca/api/resources/v1/ingestor.py b/orca/api/resources/v1/ingestor.py index 9f95c22..a625ab7 100644 --- a/orca/api/resources/v1/ingestor.py +++ b/orca/api/resources/v1/ingestor.py @@ -40,18 +40,36 @@ def post(self): self._ingestor.ingest(payload) +class IngestorRegistry(object): + + """Registers ingestor API resources and graph linkers.""" + + def __init__(self, api, graph, event_dispatcher): + self._api = api + self._graph = graph + self._event_dispatcher = event_dispatcher + + def register(self, ingestor_bundle): + for linker_module in ingestor_bundle.linkers: + linker_instance = linker_module.get(self._graph) + self._event_dispatcher.add_linker(linker_instance) + + ingestor = ingestor_bundle.ingestor.get(self._graph) + endpoint = "/%s" % ingestor_bundle.name.lower() + + self._api.add_resource( + IngestorResource, endpoint, resource_class_args=(ingestor,)) + + def initialize(graph): api = Namespace('ingestor', description='Ingestor API') + + event_dispatcher = linker.EventDispatcher() + graph.add_listener(event_dispatcher) + ingestor_registry = IngestorRegistry(api, graph, event_dispatcher) + ingestor_modules = [prometheus, falco, elastalert] - linkers = [] for ingestor_module in ingestor_modules: for ingestor_bundle in ingestor_module.get_ingestors(): - for linker_module in ingestor_bundle.linkers: - linkers.append(linker_module.get(graph)) - api.add_resource(IngestorResource, "/%s" % ingestor_bundle.name.lower(), - resource_class_args=(ingestor_bundle.ingestor.get(graph),)) - dispatcher = linker.EventDispatcher() - for linker_instance in linkers: - dispatcher.add_linker(linker_instance) - graph.add_listener(dispatcher) + ingestor_registry.register(ingestor_bundle) return api From a9741971172b6be692dde67afe8f0d71bcd3d912 Mon Sep 17 00:00:00 2001 From: Bartosz Zurkowski Date: Sat, 28 Mar 2020 11:06:12 +0100 Subject: [PATCH 6/6] Minor refactorings Signed-off-by: Bartosz Zurkowski --- orca/api/resources/v1/ingestor.py | 3 --- orca/common/clients/kiali/client.py | 3 ++- orca/common/config.py | 1 - orca/topology/infra/istio/matcher.py | 2 +- orca/topology/probe.py | 6 +++--- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/orca/api/resources/v1/ingestor.py b/orca/api/resources/v1/ingestor.py index a625ab7..1302e95 100644 --- a/orca/api/resources/v1/ingestor.py +++ b/orca/api/resources/v1/ingestor.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json - from flask import request from flask_restplus import Namespace, Resource -from orca import exceptions from orca.common import logger from orca.topology import linker from orca.topology.alerts import elastalert diff --git a/orca/common/clients/kiali/client.py b/orca/common/clients/kiali/client.py index 8049d09..17fa8cf 100644 --- a/orca/common/clients/kiali/client.py +++ b/orca/common/clients/kiali/client.py @@ -30,7 +30,8 @@ def graph_namespaces(self, namespaces, graph_type='service'): "/namespaces/graph", namespaces=namespace_list, graphType=graph_type) @classmethod - def get(cls, url="http://localhost:20001", api_prefix="/kiali/api", username=None, password=None): + def get(cls, url="http://localhost:20001", api_prefix="/kiali/api", username=None, + password=None): basic_auth = None if username and password: basic_auth = auth.HTTPBasicAuth(username, password) diff --git a/orca/common/config.py b/orca/common/config.py index 11d3671..58af3e6 100644 --- a/orca/common/config.py +++ b/orca/common/config.py @@ -60,7 +60,6 @@ def parse(self, config_path): config_dict = file_utils.load_yaml(config_path) validator = cerberus.Validator(self._schema) is_valid = validator.validate(config_dict) - if not is_valid: raise exceptions.ConfigParseError(errors=validator.errors) return dict_lib.Dict(validator.document) diff --git a/orca/topology/infra/istio/matcher.py b/orca/topology/infra/istio/matcher.py index 03576cd..484b58d 100644 --- a/orca/topology/infra/istio/matcher.py +++ b/orca/topology/infra/istio/matcher.py @@ -70,6 +70,6 @@ def match_host_to_service(namespace, host, service): host_parts = host.split('.') service_name = host_parts[0] service_namespace = host_parts[1] if len(host_parts) > 1 else namespace - matched_name = service_name == service.properties.name + matched_name = service_name == service.properties.name matched_namespace = service_namespace == service.properties.namespace return matched_name and matched_namespace diff --git a/orca/topology/probe.py b/orca/topology/probe.py index 02f1081..bb3c338 100644 --- a/orca/topology/probe.py +++ b/orca/topology/probe.py @@ -61,10 +61,10 @@ def _initialize_linkers(self): return linkers def _setup_event_dispatcher(self, linkers): - dispatcher = linker.EventDispatcher() + event_dispatcher = linker.EventDispatcher() for linker_instance in linkers: - dispatcher.add_linker(linker_instance) - self._graph.add_listener(dispatcher) + event_dispatcher.add_linker(linker_instance) + self._graph.add_listener(event_dispatcher) class Probe(abc.ABC):