Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement generic ingestor setup #21

Merged
merged 6 commits into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 32 additions & 40 deletions orca/api/resources/v1/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,69 +12,61 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from flask import request
from flask_restplus import Namespace, Resource

from orca import exceptions
from orca.common import logger
from orca.topology.alerts.elastalert import manager as es
from orca.topology.alerts.falco import manager as falco
from orca.topology.alerts.prometheus import manager as prometheus
from orca.topology import linker
from orca.topology.alerts import elastalert
from orca.topology.alerts import falco
from orca.topology.alerts import prometheus

LOG = logger.get_logger(__name__)


class Ingestor(Resource):
class IngestorResource(Resource):

"""Base class for alert ingestors."""
"""Endpoint for ingesting events from the upstream."""

def __init__(self, api, entity_handler):
def __init__(self, api, ingestor):
super().__init__()
self._entity_handler = entity_handler
self._ingestor = ingestor

def post(self):
payload = request.json
LOG.debug("Ingested an entity: %s", json.dumps(payload))
self._ingest(payload)

def _ingest(self, entity):
try:
self._entity_handler.handle_event(entity)
except exceptions.OrcaError as ex:
LOG.debug("Error while processing an entity: %s", ex)

self._ingestor.ingest(payload)

class Prometheus(Ingestor):

"""Prometheus ingest endpoint."""
class IngestorRegistry(object):

def post(self):
payload = request.json
for alert in payload['alerts']:
self._ingest(alert)
"""Registers ingestor API resources and graph linkers."""

def __init__(self, api, graph, event_dispatcher):
self._api = api
self._graph = graph
self._event_dispatcher = event_dispatcher

class Falco(Ingestor):
def register(self, ingestor_bundle):
for linker_module in ingestor_bundle.linkers:
linker_instance = linker_module.get(self._graph)
self._event_dispatcher.add_linker(linker_instance)

"""Falco ingest endpoint."""
ingestor = ingestor_bundle.ingestor.get(self._graph)
endpoint = "/%s" % ingestor_bundle.name.lower()


class Elastalert(Ingestor):

"""Elastalert ingest endpoint."""
self._api.add_resource(
IngestorResource, endpoint, resource_class_args=(ingestor,))


def initialize(graph):
api = Namespace('ingestor', description='Ingestor API')
api.add_resource(
Prometheus, '/prometheus',
resource_class_args=[prometheus.initialize_handler(graph)])
api.add_resource(
Falco, '/falco',
resource_class_args=[falco.initialize_handler(graph)])
api.add_resource(
Elastalert, '/elastalert',
resource_class_args=[es.initialize_handler(graph)])

event_dispatcher = linker.EventDispatcher()
graph.add_listener(event_dispatcher)
ingestor_registry = IngestorRegistry(api, graph, event_dispatcher)

ingestor_modules = [prometheus, falco, elastalert]
for ingestor_module in ingestor_modules:
for ingestor_bundle in ingestor_module.get_ingestors():
ingestor_registry.register(ingestor_bundle)
return api
3 changes: 2 additions & 1 deletion orca/common/clients/kiali/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def graph_namespaces(self, namespaces, graph_type='service'):
"/namespaces/graph", namespaces=namespace_list, graphType=graph_type)

@classmethod
def get(cls, url="http://localhost:20001", api_prefix="/kiali/api", username=None, password=None):
def get(cls, url="http://localhost:20001", api_prefix="/kiali/api", username=None,
password=None):
basic_auth = None
if username and password:
basic_auth = auth.HTTPBasicAuth(username, password)
Expand Down
1 change: 0 additions & 1 deletion orca/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def parse(self, config_path):
config_dict = file_utils.load_yaml(config_path)
validator = cerberus.Validator(self._schema)
is_valid = validator.validate(config_dict)

if not is_valid:
raise exceptions.ConfigParseError(errors=validator.errors)
return dict_lib.Dict(validator.document)
Expand Down
13 changes: 13 additions & 0 deletions orca/topology/alerts/elastalert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from orca.topology import bundle
from orca.topology.alerts.elastalert import linker, ingestor


def get_ingestors():
return [
bundle.IngestorBundle(
name='elastalert',
ingestor=ingestor.AlertIngestor,
linkers=[linker.AlertLinker]
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@
# limitations under the License.

from orca.topology import ingestor
from orca.topology.alerts.falco import extractor, linker
from orca.topology.alerts.elastalert import extractor


def initialize_probes(graph):
return []
class AlertIngestor(ingestor.Ingestor):


def initialize_linkers(graph):
return [linker.AlertLinker.get(graph)]


def initialize_handler(graph):
return ingestor.EventHandler(graph, extractor.AlertExtractor.get())
@classmethod
def get(cls, graph):
return cls(graph, extractor.AlertExtractor.get())
13 changes: 13 additions & 0 deletions orca/topology/alerts/falco/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from orca.topology import bundle
from orca.topology.alerts.falco import linker, ingestor


def get_ingestors():
return [
bundle.IngestorBundle(
name='falco',
ingestor=ingestor.AlertIngestor,
linkers=[linker.AlertLinker]
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from orca.topology.infra.kiali import probe
from orca.topology import bundle
from orca.topology import ingestor
from orca.topology.alerts.falco import extractor


def get_bundles():
return [
bundle.ProbeBundle(
probe=probe.ServiceGraphProbe,
linkers=[]
)
]
class AlertIngestor(ingestor.Ingestor):

@classmethod
def get(cls, graph):
return cls(graph, extractor.AlertExtractor.get())
22 changes: 22 additions & 0 deletions orca/topology/alerts/prometheus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from orca.topology import bundle
from orca.topology.alerts.prometheus import ingestor, linker, probe


def get_probes():
return [
bundle.ProbeBundle(
probe=probe.AlertProbe,
linkers=[linker.AlertLinker]
)
]


def get_ingestors():
return [
bundle.IngestorBundle(
name='prometheus',
ingestor=ingestor.AlertIngestor,
linkers=[linker.AlertLinker]
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
# limitations under the License.

from orca.topology import ingestor
from orca.topology.alerts.elastalert import extractor, linker
from orca.topology.alerts.prometheus import extractor


def initialize_probes(graph):
return []
class AlertIngestor(ingestor.Ingestor):

def ingest(self, event):
for alert in event['alerts']:
super().ingest(alert)

def initialize_linkers(graph):
return [linker.AlertLinker.get(graph)]


def initialize_handler(graph):
return ingestor.EventHandler(graph, extractor.AlertExtractor.get())
@classmethod
def get(cls, graph):
return cls(graph, extractor.AlertEventExtractor.get())
32 changes: 0 additions & 32 deletions orca/topology/alerts/prometheus/manager.py

This file was deleted.

10 changes: 4 additions & 6 deletions orca/topology/alerts/prometheus/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
from orca.common import config
from orca.common.clients.prometheus import client as prometheus
from orca.topology import probe, utils
from orca.topology.alerts import extractor
from orca.topology.alerts.prometheus import extractor as prom_extractor
from orca.topology.alerts.prometheus import upstream
from orca.topology.alerts.prometheus import extractor, upstream

CONFIG = config.CONFIG

Expand All @@ -28,11 +26,11 @@ class AlertProbe(probe.PullProbe):

@classmethod
def get(cls, graph):
source_mapper = extractor.SourceMapper('prometheus')
prom_client = prometheus.PrometheusClient.get(url=CONFIG.prometheus.url)
prom_client = prometheus.PrometheusClient.get(
url=CONFIG.prometheus.url)
return cls(
graph=graph,
upstream_proxy=upstream.UpstreamProxy(prom_client),
extractor=prom_extractor.AlertExtractor(source_mapper),
extractor=extractor.AlertExtractor.get(),
synchronizer=utils.NodeSynchronizer(graph),
resync_period=CONFIG.prometheus.resync_period)
10 changes: 10 additions & 0 deletions orca/topology/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ class ProbeBundle(object):
def __init__(self, probe, linkers):
self.probe = probe
self.linkers = linkers


class IngestorBundle(object):

"""Value object holding ingestor runtime spec."""

def __init__(self, name, ingestor, linkers):
self.name = name
self.ingestor = ingestor
self.linkers = linkers
51 changes: 51 additions & 0 deletions orca/topology/infra/istio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,54 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from orca.topology.infra.istio import linker, probe
from orca.topology import bundle


def get_probes():
return [
bundle.ProbeBundle(
probe=probe.VirtualServicePullProbe,
linkers=[
linker.VirtualServiceToGatewayLinker,
linker.VirtualServiceToServiceLinker
]
),

bundle.ProbeBundle(
probe=probe.VirtualServicePushProbe,
linkers=[
linker.VirtualServiceToGatewayLinker,
linker.VirtualServiceToServiceLinker
]
),

bundle.ProbeBundle(
probe=probe.DestinationRulePullProbe,
linkers=[
linker.DestinationRuleToServiceLinker
]
),

bundle.ProbeBundle(
probe=probe.DestinationRulePushProbe,
linkers=[
linker.DestinationRuleToServiceLinker
]
),

bundle.ProbeBundle(
probe=probe.GatewayPullProbe,
linkers=[
linker.VirtualServiceToGatewayLinker
]
),

bundle.ProbeBundle(
probe=probe.GatewayPushProbe,
linkers=[
linker.VirtualServiceToGatewayLinker
]
),
]
Loading