Skip to content

Commit

Permalink
Merge pull request #23 from Urban-Research-Group/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
Nicholas-Polimeni authored Jan 26, 2024
2 parents a0c3e80 + 3b14459 commit a9d3f41
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 324 deletions.
File renamed without changes.
52 changes: 52 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from dataclasses import dataclass
import pandas as pd
from src import file_io
from src import file_utils


# TODO: check YAML formatting
@dataclass
class DataInfo:
"""Retains data parameters for processing instructions"""

county_name: str
county_path: str
var_map_path: str
operations: dict
output: dict
county_files: list = None
var_map_non_derived: pd.DataFrame = None
var_map_derived: pd.DataFrame = None


def create_data_info(config_path: str) -> DataInfo:
"""Reads YAML config instruction set and initalizes a
data_info dataclass with values from the config.
Args:
file_path (str): path to config YAML
Returns:
data_info: dataclass containing config parameters
"""
config = file_io.read_config(config_path)
county_path = config["county-path"]
var_map_path = config["var-map-path"]

county_files = file_utils.get_all_files_in_dir(county_path)
var_map = file_io.read_var_map(var_map_path)

derived_mask = var_map["derived"].str.lower().eq("false")
var_map_non_derived = var_map[~derived_mask]
var_map_derived = var_map[derived_mask]

return DataInfo(
county_name=config["county-name"],
county_path=county_path,
var_map_path=var_map_path,
operations=config["operations"],
output=config["output"],
county_files=county_files,
var_map_non_derived=var_map_non_derived,
var_map_derived=var_map_derived,
)
110 changes: 110 additions & 0 deletions src/file_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import yaml
import numpy as np
import pandas as pd
from src.logger import configure_logger, timing

logger = configure_logger()


class File:
def __init__(self, file_path: str, format_file=None):
self.file_path = file_path
self.format_file = format_file

@staticmethod
def _read_format_file(path):
return np.loadtxt(path, dtype=str)

@staticmethod
def _get_widths_from_format(format_file: np.array):
return format_file[:, 2].astype(np.int16).tolist()

@staticmethod
def _get_column_headers_from_format(format_file: np.array):
header_col = format_file[:, 0]
return header_col

def _get_fwf_paramters(self):
format_file = File._read_format_file(self.format_file)
widths = File._get_widths_from_format(format_file)
column_headers = File._get_column_headers_from_format(format_file)
return widths, column_headers

@timing
def read(self):
"""Reads a file based on its extension and returns a dataframe"""
file_type = self.file_path.split(".")[-1].lower()

match file_type:
case "csv":
df = pd.read_csv(self.file_path, encoding="latin-1", low_memory=False)
case "xlsx" | "xls":
df = pd.read_excel(self.file_path)
case "txt":
widths, column_headers = self._get_fwf_paramters()
df = pd.read_fwf(
self.file_path, widths=widths, header=None, encoding="latin-1"
)
df.columns = column_headers
case "dat":
NotImplemented
case _:
error_msg = f"File type {file_type} not supported"
logger.error(error_msg)
raise ValueError(error_msg)

return df


def read_var_map(var_map_path: str) -> pd.DataFrame:
try:
var_map = pd.read_csv(var_map_path)
except Exception as e:
logger.error("Could not read var map file with error %s", e)

return var_map


def read_config(config_path: str) -> dict:
try:
with open(config_path, "r") as config:
config = yaml.safe_load(config)
except Exception as e:
logger.error("Could not read config file with error %s", e)

return config


class WriteOutput:
def __init__(self, data, output_path, output_formats):
self.data = data
self.output_path = output_path
self.output_formats = output_formats

@timing
def write_output(self):
for output_format in self.output_formats:
if output_format == "csv":
self.write_csv()
elif output_format == "parquet":
self.write_parquet()
else:
logger.warning("%s is not supported!", output_format)

def write_csv(self):
try:
self.data.to_csv(self.output_path + ".csv")
except Exception as e:
self.log("CSV", e)

def write_parquet(self):
try:
self.data.to_parquet(self.output_path + ".parquet")
except Exception as e:
self.log("Parquet", e)

def log(self, file_type, e):
if e:
logger.error("Error writing %s output: %s", file_type, e)
else:
logger.info("Saved to %s", self.output_path)
129 changes: 7 additions & 122 deletions src/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,12 @@
import os
import pandas as pd
import numpy as np
import operations
from logger import configure_logger, timing
from src import file_io
from src import operations
from src.logger import configure_logger

logger = configure_logger()


def _read_format_file(path):
return np.loadtxt(path, dtype=str)


def _get_widths_from_format(format_file: np.array):
return format_file[:, 2].astype(np.int16).tolist()


def _get_column_headers_from_format(format_file: np.array):
header_col = format_file[:, 0]
return header_col


def _get_fwf_paramters(format_file_path):
format_file = _read_format_file(format_file_path)
widths = _get_widths_from_format(format_file)
column_headers = _get_column_headers_from_format(format_file)
return widths, column_headers


def read_var_map(var_map_path: str) -> dict[str, str]:
"""_summary_
Args:
var_map_path (str): _description_
Returns:
dict[str, str]: _description_
"""
var_map = pd.read_csv(var_map_path)
# we want a dict with (old, source): {new var name: val, type: val, source: val}
var_dict = {
(row["old_name"], row["source_file"]): {
"new_name": row["new_name"],
"data_type": row["data_type"],
}
for _, row in var_map.iterrows()
if row["derived"] == "false"
}
return var_dict


def get_all_files_in_dir(directory: str) -> list[str]:
"""Returns a list of all files in a directory and its subdirectories
Expand Down Expand Up @@ -79,81 +37,8 @@ def select_files(file_paths: list[str], key: str) -> list[str]:
return [file_path for file_path in file_paths if key in file_path]


@timing
def create_df_from_file(file_path: str, format_file=None) -> pd.DataFrame:
"""Reads a file based on its extension and returns a dataframe
Args:
file_path (str): path to file
format_file_pattern (str, optional): regex pattern for format file, if needed.
Defaults to None.
Raises:
ValueError: if file type is not supported
Returns:
pd.DataFrame: dataframe of file
"""
# TODO add and validate file types
file_type = file_path.split(".")[-1].lower()

match file_type:
case "csv":
df = pd.read_csv(file_path, encoding="latin-1", low_memory=False)
case "xlsx" | "xls":
df = pd.read_excel(file_path)
case "txt":
widths, column_headers = _get_fwf_paramters(format_file)
df = pd.read_fwf(file_path, widths=widths, header=None, encoding="latin-1")
df.columns = column_headers
case "dat":
NotImplemented
case _:
raise ValueError(f"File type {file_type} not supported")

return df


def get_desired_cols(file_name: str, var_map: dict) -> list[str]:
"""Returns a list of all original column names that are desired
Args:
file_name (str): _description_
var_map (dict): _description_
Returns:
list[str]: _description_
"""
return [
key[0]
for key in var_map.keys()
if key[1].lower() == "all" or file_name in key[1]
]


def get_col_params(column: str, source: str, var_map: dict) -> dict:
"""_summary_
Args:
column (str): _description_
source (str): _description_
var_map (dict): _description_
Returns:
dict: _description_
"""
# TODO: might want to convert data type strings to actual types here
params = None

for key in var_map.keys():
if key[0] == column and (source in key[1] or key[1].lower() == "all"):
params = var_map[key]

return params


def create_dfs_from_files(
file_paths: list[str], format_file: str, var_map: dict
file_paths: list[str], format_file: str = None, var_map: pd.DataFrame = None
) -> list[pd.DataFrame]:
"""Reads in a file as a DataFrame and performs data cleaning operations for
each file in file_paths
Expand All @@ -175,13 +60,13 @@ def create_dfs_from_files(
for file_path in file_paths:
file_name = file_path.split("\\")[-1]

df = create_df_from_file(file_path, format_file)
logger.info(f"Shape of {file_name} when read: {df.shape}")
df = file_io.File(file_path, format_file).read()
logger.info("Shape of %s when read: %s", file_name, df.shape)

df = operations.clean_df(df, file_name, var_map)
df = operations.create_derived_cols(df, file_name)

logger.info(f"Shape of {file_name} after processing: {df.shape}")
logger.info("Shape of %s after processing: %s", file_name, df.shape)
dfs.append(df)

return dfs
Loading

0 comments on commit a9d3f41

Please sign in to comment.