Skip to content

Commit

Permalink
Tidy up code with types and annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchrisadams committed Nov 14, 2023
1 parent fd48dfc commit 5f91246
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 50 deletions.
16 changes: 8 additions & 8 deletions apps/greencheck/importers/importer_amazon.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import requests
import logging
from typing import List, Tuple, Union

from apps.greencheck.importers.network_importer import NetworkImporter
from apps.greencheck.importers.importer_interface import ImporterProtocol
import requests
from django.conf import settings

from apps.accounts.models.hosting import Hostingprovider

from django.conf import settings
from apps.greencheck.importers.importer_interface import ImporterProtocol
from apps.greencheck.importers.network_importer import NetworkImporter

logger = logging.getLogger(__name__)


class AmazonImporter:
"""
An importer that consumes an set of IP ranges from Amazon's AWS endpoint,
An importer that consumes a set of IP ranges from Amazon's AWS endpoint,
and updates the database with the new IP ranges for each region for the
provider
"""
Expand Down Expand Up @@ -45,7 +45,7 @@ def process(self, list_of_addresses):
network_importer.deactivate_ips()
return network_importer.process_addresses(list_of_addresses)

def fetch_data_from_source(self) -> str:
def fetch_data_from_source(self) -> dict:
"""
Fetch the data from the endpoint, returning the parsed json
"""
Expand All @@ -56,7 +56,7 @@ def fetch_data_from_source(self) -> str:
logger.warning("Unable to fetch ip data. Aborting early.")

# fill in the type signature to be a list of either IP Networks, or AS names
def parse_to_list(self, raw_data) -> "list[str]":
def parse_to_list(self, raw_data) -> List[Union[str, Tuple]]:
"""
Convert the parsed data into a list of either IP Ranges
"""
Expand Down
16 changes: 7 additions & 9 deletions apps/greencheck/importers/importer_csv.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import ipaddress
import logging
from typing import List, Tuple, Union

import pandas as pd

from typing import List
from apps.accounts.models.hosting import Hostingprovider

from apps.greencheck.importers.importer_interface import ImporterProtocol
from apps.greencheck.importers.network_importer import (
NetworkImporter,
is_asn,
is_ip_network,
is_ip_range,
is_asn,
)
from apps.greencheck.importers.importer_interface import ImporterProtocol

from apps.greencheck.models import GreencheckIp, GreencheckASN
import ipaddress
from django.conf import settings
from apps.greencheck.models import GreencheckASN, GreencheckIp

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +42,7 @@ def fetch_data_from_source(cls, filepath_or_buffer) -> List:

return cls.parse_to_list(raw_data)

def parse_to_list(self, raw_data: pd.DataFrame) -> List:
def parse_to_list(self, raw_data: pd.DataFrame) -> List[Union[str, Tuple]]:
"""
Parse the provided pandas DataFrame, and return a flattened list
of ip ranges, or importable IP networks, or AS numbers
Expand Down
13 changes: 7 additions & 6 deletions apps/greencheck/importers/importer_equinix.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import requests
import logging
from typing import List, Tuple, Union

from apps.greencheck.importers.network_importer import NetworkImporter
from apps.greencheck.importers.importer_interface import ImporterProtocol
import requests
from django.conf import settings

from apps.accounts.models.hosting import Hostingprovider
from django.conf import settings
from apps.greencheck.importers.importer_interface import ImporterProtocol
from apps.greencheck.importers.network_importer import NetworkImporter

logger = logging.getLogger(__name__)

Expand All @@ -29,7 +30,7 @@ def fetch_data_from_source(cls) -> list:
except requests.RequestException:
logger.warning("Unable to fetch text file. Aborting early.")

def parse_to_list(cls, raw_data) -> list:
def parse_to_list(self, raw_data) -> List[Union[str, Tuple]]:
try:
list_of_ips = []
for line in raw_data.splitlines():
Expand All @@ -41,7 +42,7 @@ def parse_to_list(cls, raw_data) -> list:
) # Format as follows: IP range/ASN, Naming

return list_of_ips
except Exception as e:
except Exception:
logger.exception("Something really unexpected happened. Aborting")


Expand Down
9 changes: 4 additions & 5 deletions apps/greencheck/importers/importer_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging

from typing import Protocol, runtime_checkable

from typing import List, Protocol, Tuple, Union, runtime_checkable

logger = logging.getLogger(__name__)

Expand All @@ -15,10 +13,11 @@ def fetch_data_from_source(self) -> str:
"""
raise NotImplementedError

def parse_to_list(self, str) -> "list[str]":
def parse_to_list(self, raw_data) -> List[Union[str, Tuple]]:
"""
Returns a list of either strings that can be
parsed as AS numbers or IP networks
parsed as AS numbers or IP networks, or tuples
representing IP ranges
"""
raise NotImplementedError

Expand Down
11 changes: 6 additions & 5 deletions apps/greencheck/importers/importer_microsoft.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import requests
import logging
from typing import List, Tuple, Union

from apps.greencheck.importers.network_importer import NetworkImporter
from apps.greencheck.importers.importer_interface import ImporterProtocol
import requests
from django.conf import settings

from apps.accounts.models.hosting import Hostingprovider
from django.conf import settings
from apps.greencheck.importers.importer_interface import ImporterProtocol
from apps.greencheck.importers.network_importer import NetworkImporter

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +26,7 @@ def fetch_data_from_source(self) -> list:
except requests.RequestException:
logger.warning("Unable to fetch ip range data. Aborting early.")

def parse_to_list(self, raw_data) -> list:
def parse_to_list(self, raw_data) -> List[Union[str, Tuple]]:
list_of_ips = []

# Extract IPv4 and IPv6
Expand Down
27 changes: 10 additions & 17 deletions apps/greencheck/importers/network_importer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import ipaddress
import logging

from typing import Union

from apps.greencheck.models import GreencheckIp, GreencheckASN
from apps.accounts.models import Hostingprovider
from apps.greencheck.models import GreencheckASN, GreencheckIp

logger = logging.getLogger(__name__)

Expand All @@ -13,9 +12,8 @@ def is_ip_network(address: str) -> bool:
"""
Check that "address" is a string we can parse to an ip network,
ready for saving as an ip range.
where we get addresses like '104.21.2.192/24'
we want to raise an exception rather than try to fix them on
the way in
Where we get addresses like '104.21.2.192/24'
we want to raise an exception rather than try to fix
"""

# exit early if we do not have slash dividing
Expand All @@ -32,9 +30,7 @@ def is_ip_network(address: str) -> bool:

def is_ip_range(address: Union[str, tuple]) -> bool:
"""
Check if "address" is a usable ip range, and return the tuple
containing the required ip addresses, ready for importing
as an ip range
Check if "address" is a valid ip range.
"""
# return early if not a tuple we can parse
if not isinstance(address, tuple):
Expand All @@ -54,7 +50,7 @@ def is_ip_range(address: Union[str, tuple]) -> bool:

def is_asn(address: Union[str, tuple]) -> bool:
"""
Check that "address" is a string suitable for saving as a AS number
Check that "address" is a string suitable for saving as an AS number
"""
# return early if not a string we cacn parse
if not isinstance(address, str):
Expand Down Expand Up @@ -110,10 +106,9 @@ def save_asn(self, address: str):
)

gc_asn.active = True
gc_asn.save() # Save the newly created or updated object
gc_asn.save()

if created:
logger.debug(gc_asn)
return (gc_asn, created)

return (gc_asn, False)
Expand All @@ -137,11 +132,9 @@ def save_ip(self, address, location=None):
gc_ip.location = location

gc_ip.active = True
gc_ip.save() # Save the newly created or updated object
gc_ip.save()

if created:
# Only log and return when a new object was created
logger.debug(gc_ip)
return (gc_ip, created)

return (gc_ip, False)
Expand All @@ -167,7 +160,7 @@ def process_addresses(self, list_of_addresses: list):

if is_ip_network(address):
# address looks like IPv4 or IPv6 network
# turn it to an ip_network opbject
# turn it to an ip_network object
network = ipaddress.ip_network(address)
green_ip, created = self.save_ip((network[0], network[-1]))

Expand Down Expand Up @@ -197,15 +190,15 @@ def process_addresses(self, list_of_addresses: list):
"created_green_ips": created_green_ips,
}
except ValueError:
logger.warn(f"An error occurred while adding new entries")
logger.warn("An error occurred while adding new entries")
return {
"green_ips": green_ips,
"green_asns": green_asns,
"created_asns": created_asns,
"created_green_ips": created_green_ips,
}
except Exception as e:
logger.exception(f"Something really unexpected happened. Aborting")
logger.exception("Something really unexpected happened. Aborting")
logger.exception(e)
return {
"green_ips": green_ips,
Expand Down

0 comments on commit 5f91246

Please sign in to comment.