Skip to content

Commit

Permalink
feat: improve code format and style
Browse files Browse the repository at this point in the history
Refactored code to improve readability and compliance with PEP 8 standards. This includes removing unnecessary whitespace, improving line alignment, and adding spaces around operators and after commas where needed. Also corrected some type hinting issues. No changes were made to the logic or functionality of the code.
  • Loading branch information
cpainchaud committed Jun 17, 2024
1 parent d7436d3 commit d4a8f10
Show file tree
Hide file tree
Showing 33 changed files with 207 additions and 292 deletions.
65 changes: 31 additions & 34 deletions illumio_pylo/API/APIConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ def get_field_or_die(field_name: str, data):


ObjectTypes = Literal['iplists', 'workloads', 'virtual_services', 'labels', 'labelgroups', 'services', 'rulesets',
'security_principals', 'label_dimensions']
'security_principals', 'label_dimensions']

all_object_types: Dict[ObjectTypes, ObjectTypes] = {
'iplists': 'iplists',
'workloads': 'workloads',
'virtual_services': 'virtual_services',
'labels': 'labels',
'labelgroups': 'labelgroups',
'services': 'services',
'rulesets': 'rulesets',
'security_principals': 'security_principals',
'label_dimensions': 'label_dimensions'
}
'iplists': 'iplists',
'workloads': 'workloads',
'virtual_services': 'virtual_services',
'labels': 'labels',
'labelgroups': 'labelgroups',
'services': 'services',
'rulesets': 'rulesets',
'security_principals': 'security_principals',
'label_dimensions': 'label_dimensions'
}


class APIConnector:
Expand All @@ -69,7 +69,7 @@ def __init__(self, fqdn: str, port, api_user: str, api_key: str, skip_ssl_cert_c
port = str(port)
self.port: int = port
self._api_key: str = api_key
self._decrypted_api_key: str = None
self._decrypted_api_key: Optional[str] = None
self.api_user: str = api_user
self.org_id: int = org_id
self.skipSSLCertCheck: bool = skip_ssl_cert_check
Expand All @@ -86,7 +86,6 @@ def api_key(self):
return self._decrypted_api_key
return self._api_key


@staticmethod
def get_all_object_types_names_except(exception_list: List[ObjectTypes]):

Expand Down Expand Up @@ -138,7 +137,7 @@ def create_from_credentials_in_file(fqdn_or_profile_name: str, request_if_missin
connector = pylo.APIConnector(fqdn_or_profile_name, port, user, password, skip_ssl_cert_check=True, name=name)
return connector

def _make_base_url(self, path: str='') -> str:
def _make_base_url(self, path: str = '') -> str:
# remove leading '/' from path if exists
if len(path) > 0 and path[0] == '/':
path = path[1:]
Expand Down Expand Up @@ -222,7 +221,7 @@ def _do_call(self, method, path, json_arguments=None, include_org_id=True, json_
raise pylo.PyloApiEx("PCE connectivity or low level issue: {}".format(e))

answer_size = len(req.content) / 1024
log.info("URL downloaded (size "+str( int(answer_size) )+"KB) Reply headers:\n" +
log.info("URL downloaded (size "+str(int(answer_size))+"KB) Reply headers:\n" +
"HTTP " + method + " " + url + " STATUS " + str(req.status_code) + " " + req.reason)
log.info(req.headers)
# log.info("Request Body:" + pylo.nice_json(json_arguments))
Expand All @@ -247,7 +246,9 @@ def _do_call(self, method, path, json_arguments=None, include_org_id=True, json_
retry_loop_times = 0

while True:
log.info("Sleeping " + str(retry_interval) + " seconds before polling for job status, elapsed " + str(retry_interval*retry_loop_times) + " seconds so far" )
log.info(
"Sleeping " + str(retry_interval) + " seconds before polling for job status, elapsed " + str(
retry_interval * retry_loop_times) + " seconds so far")
retry_loop_times += 1
time.sleep(retry_interval)
job_poll = self.do_get_call(job_location, include_org_id=False)
Expand Down Expand Up @@ -304,17 +305,18 @@ def _do_call(self, method, path, json_arguments=None, include_org_id=True, json_
continue

if req.status_code == 403:
raise pylo.PyloApiRequestForbiddenEx('API returned error status "' + str(req.status_code) + ' ' + req.reason
+ '" and error message: ' + req.text)
raise pylo.PyloApiRequestForbiddenEx(
'API returned error status "' + str(req.status_code) + ' ' + req.reason
+ '" and error message: ' + req.text)

raise pylo.PyloApiEx('API returned error status "' + str(req.status_code) + ' ' + req.reason
+ '" and error message: ' + req.text)
+ '" and error message: ' + req.text)

if return_headers:
return req.headers

if json_output_expected:
log.info("Parsing API answer to JSON (with a size of " + str( int(answer_size) ) + "KB)")
log.info("Parsing API answer to JSON (with a size of " + str(int(answer_size)) + "KB)")
json_out = req.json()
log.info("Done!")
if answer_size < 5:
Expand Down Expand Up @@ -387,7 +389,6 @@ def get_pce_objects(self, include_deleted_workloads=False, list_of_objects_to_lo
if 'label_dimensions' in objects_to_load:
del objects_to_load['label_dimensions']


threads_count = 4
data: PCEObjectsJsonStructure = pylo.Organization.create_fake_empty_config()
errors = []
Expand Down Expand Up @@ -660,7 +661,7 @@ def objects_ven_get(self,
def objects_workload_get(self,
include_deleted=False,
filter_by_ip: str = None,
filter_by_label: WorkloadsGetQueryLabelFilterJsonStructure=None,
filter_by_label: WorkloadsGetQueryLabelFilterJsonStructure = None,
filter_by_name: str = None,
filter_by_managed: bool = None,
filer_by_policy_health: Literal['active', 'warning', 'error'] = None,
Expand Down Expand Up @@ -787,12 +788,11 @@ def execute(self, unpair_agents=False):

try:
result = self.connector.objects_workload_delete_multi(list(self._hrefs.keys()))
except Exception as ex: #global exception means something really bad happened we log errors for all workloads
except Exception as ex: # global exception means something really bad happened we log errors for all workloads
for href in self._hrefs.keys():
self._errors[href] = str(ex)
return


# print(pylo.nice_json(result))
if not type(result) is list:
raise pylo.PyloApiEx("API didnt return expected JSON format", result)
Expand Down Expand Up @@ -837,7 +837,6 @@ def _unpair_agents(self, workloads_hrefs: [str]):
self._errors[href] = str(ex)
break


def count_entries(self):
return len(self._hrefs)

Expand All @@ -847,7 +846,7 @@ def count_errors(self):
def new_tracker_workload_multi_delete(self):
return APIConnector.WorkloadMultiDeleteTracker(self)

def objects_workload_delete_multi(self, href_or_workload_array: Union[List['pylo.Workload'],List[str]]):
def objects_workload_delete_multi(self, href_or_workload_array: Union[List['pylo.Workload'], List[str]]):
if len(href_or_workload_array) < 1:
return

Expand Down Expand Up @@ -923,8 +922,7 @@ def objects_service_delete(self, href):

return self.do_delete_call(path=path, json_output_expected=False, include_org_id=False)

def objects_network_device_get(self,
max_results: int = None) -> List[NetworkDeviceObjectJsonStructure]:
def objects_network_device_get(self, max_results: int = None) -> List[NetworkDeviceObjectJsonStructure]:
path = '/network_devices'
data = {}

Expand Down Expand Up @@ -1069,7 +1067,7 @@ def objects_rule_create(self, ruleset_href: str,
'enabled': enabled,
'stateless': stateless,
'consuming_security_principals': consuming_security_principals,
'resolve_labels_as': {'providers': resolve_providers, 'consumers': resolve_consumers,},
'resolve_labels_as': {'providers': resolve_providers, 'consumers': resolve_consumers},
'consumers': consumers_json,
'providers': providers_json,
'ingress_services': services_json
Expand Down Expand Up @@ -1270,7 +1268,7 @@ def explorer_search(self, filters: Union[Dict, 'pylo.ExplorerFilterSetV1'],
# get current timestamp to ensure we don't wait too long
start_time = time.time()

query_status = None # Json response from API for specific query
query_status = None # Json response from API for specific query

while True:
# check that we don't wait too long
Expand Down Expand Up @@ -1325,20 +1323,19 @@ def new_explorer_query(self, max_results: int = 1500, max_running_time_seconds:
check_for_update_interval_seconds: int = 10) -> 'pylo.ExplorerQuery':
return pylo.ExplorerQuery(self, max_results, max_running_time_seconds, check_for_update_interval_seconds)


def new_audit_log_query(self, max_results: int = 10000, max_running_time_seconds: int = 1800,
check_for_update_interval_seconds: int = 10) -> 'pylo.AuditLogQuery':
return pylo.AuditLogQuery(self, max_results, max_running_time_seconds )
return pylo.AuditLogQuery(self, max_results, max_running_time_seconds)

def audit_log_query(self, max_results = 1000, event_type: Optional[str] = None) -> List[AuditLogApiReplyEventJsonStructure]:
def audit_log_query(self, max_results=1000, event_type: Optional[str] = None) \
-> List[AuditLogApiReplyEventJsonStructure]:
url = '/events'
args = {'max_results': max_results}
if event_type is not None:
args['event_type'] = event_type

return self.do_get_call(path=url, params=args)


def get_pce_ui_workload_url(self, href: str) -> str:
# extract UUID from workload HREF:
uuid = href.split('/')[-1]
Expand Down
3 changes: 3 additions & 0 deletions illumio_pylo/API/JsonPayloadTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class RulesetObjectJsonStructure(TypedDict):
scopes: List[List[RulesetScopeEntryLineJsonStructure]]
updated_at: str
updated_by: Optional[HrefReferenceWithName]
enabled: bool



class RulesetObjectUpdateStructure(TypedDict):
Expand Down Expand Up @@ -258,6 +260,7 @@ class PCEObjectsJsonStructure(TypedDict):
virtual_services: List[VirtualServiceObjectJsonStructure]
workloads: List[WorkloadObjectJsonStructure]
label_dimensions: List[LabelDimensionObjectStructure]
pce_version: str


class PCECacheFileJsonStructure(TypedDict):
Expand Down
21 changes: 8 additions & 13 deletions illumio_pylo/Helpers/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
import time
import functools


def nice_json(json_obj):
Expand Down Expand Up @@ -133,7 +134,7 @@ def is_valid_ipv4(ip):
def is_valid_ipv6(ip):
"""Validates IPv6 addresses.
"""
#print("testing ({})".format(ip))
# print("testing ({})".format(ip))
return ___ipv6_pattern.match(ip) is not None


Expand All @@ -145,22 +146,16 @@ def hostname_from_fqdn(fqdn: str):
__clocks_end = {}


def clock_start(name:str = 'default'):
def clock_start(name: str = 'default'):
__clocks_start[name] = time.time()


def clock_stop(name:str = 'default'):
def clock_stop(name: str = 'default'):
__clocks_end[name] = time.time()


import functools


def clock_elapsed_str(name:str = 'default'):
t = time.time()-__clocks_start[name]
def clock_elapsed_str(name: str = 'default'):
t = time.time() - __clocks_start[name]
return "%d:%02d:%02d.%03d" % \
functools.reduce(lambda ll,b : divmod(ll[0],b) + ll[1:],
[(t*1000,),1000,60,60])
return "{}".format(time.time()-__clocks_start[name])


functools.reduce(lambda ll, b: divmod(ll[0], b) + ll[1:],
[(t * 1000,), 1000, 60, 60])
14 changes: 5 additions & 9 deletions illumio_pylo/IPList.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from typing import Dict, Optional

import illumio_pylo as pylo
from .API.JsonPayloadTypes import IPListObjectJsonStructure
from illumio_pylo import log
from .Helpers import *
Expand Down Expand Up @@ -59,15 +56,15 @@ def load_from_json(self, json_input: IPListObjectJsonStructure):
self.raw_entries[entry] = entry

def get_ip4map(self) -> pylo.IP4Map:
map = pylo.IP4Map()
new_map = pylo.IP4Map()

for entry in self.raw_entries:
if entry[0] == '!':
map.subtract_from_text(entry[1:], ignore_ipv6=True)
new_map.subtract_from_text(entry[1:], ignore_ipv6=True)
else:
map.add_from_text(entry, ignore_ipv6=True)
new_map.add_from_text(entry, ignore_ipv6=True)

return map
return new_map

def get_raw_entries_as_string_list(self, separator=',') -> str:
return pylo.string_list_to_text(self.raw_entries.values(), separator=separator)
Expand Down Expand Up @@ -115,7 +112,7 @@ def load_iplists_from_json(self, json_list: list[IPListObjectJsonStructure]):
def find_by_href(self, href: str) -> Optional['pylo.IPList']:
return self.items_by_href.get(href)

def find_by_name(self, name: str, case_sensitive: bool = True ) -> Optional['pylo.IPList']:
def find_by_name(self, name: str, case_sensitive: bool = True) -> Optional['pylo.IPList']:
if case_sensitive:
for iplist in self.items_by_href.values():
if iplist.name == name:
Expand All @@ -126,4 +123,3 @@ def find_by_name(self, name: str, case_sensitive: bool = True ) -> Optional['pyl
if iplist.name.lower() == lower_name:
return iplist
return None

6 changes: 3 additions & 3 deletions illumio_pylo/IPMap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .Helpers.functions import is_valid_ipv6, string_list_to_text
import ipaddress
import copy
from typing import Optional, List, Dict
from typing import Optional, List


def sort_first(val):
Expand Down Expand Up @@ -232,13 +232,13 @@ def to_list_of_cidr_string(self, skip_netmask_for_32=False):

for netmask in range(1, 32, 1):
new_end = net_start | masks[netmask]
#print("{}/{}/{}/{}".format(ipaddress.IPv4Address(net_start), ipaddress.IPv4Address(net_end), ipaddress.IPv4Address(new_end), 32 - netmask))
# print("{}/{}/{}/{}".format(ipaddress.IPv4Address(net_start), ipaddress.IPv4Address(net_end), ipaddress.IPv4Address(new_end), 32 - netmask))

if new_end > net_end:
result.append('{}/{}'.format(ipaddress.IPv4Address(net_start), 33 - netmask))
net_start = previous_loop_end + 1
previous_loop_end = net_start
#print("breaking loop with {}/{}".format(ipaddress.IPv4Address(net_start), ipaddress.IPv4Address(previous_loop_end)))
# print("breaking loop with {}/{}".format(ipaddress.IPv4Address(net_start), ipaddress.IPv4Address(previous_loop_end)))
break

if new_end == net_end:
Expand Down
1 change: 0 additions & 1 deletion illumio_pylo/Label.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ def reference_obj(self):

def get_api_reference_json(self) -> LabelHrefRef:
return {'label': {'href': self.href}}

2 changes: 1 addition & 1 deletion illumio_pylo/LabelCommon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Union
from .Exception import PyloEx
from .LabelStore import label_type_role, label_type_env, label_type_loc, label_type_app, LabelStore
from .LabelStore import LabelStore


class LabelCommon:
Expand Down
Loading

0 comments on commit d4a8f10

Please sign in to comment.