Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docstrings to public methods and functions. #91

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions quinn/dataframe_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,26 @@ def two_columns_to_dictionary(


def to_list_of_dictionaries(df: DataFrame) -> List[Dict[str, Any]]:
"""Convert a Spark DataFrame to a list of dictionaries.

:param df: The Spark DataFrame to convert.
:type df: :py:class:`pyspark.sql.DataFrame`
:return: A list of dictionaries representing the rows in the DataFrame.
:rtype: List[Dict[str, Any]]
"""
return list(map(lambda r: r.asDict(), df.collect()))


def print_athena_create_table(
df: DataFrame, athena_table_name: str, s3location: str
) -> None:
"""Generates the Athena create table statement for a given DataFrame

:param df: The pyspark.sql.DataFrame to use
:param athena_table_name: The name of the athena table to generate
:param s3location: The S3 location of the parquet data
:return: None
"""
fields = df.schema

print(f"CREATE EXTERNAL TABLE IF NOT EXISTS `{athena_table_name}` ( ")
Expand All @@ -56,6 +70,15 @@ def print_athena_create_table(


def show_output_to_df(show_output: str, spark: SparkSession) -> DataFrame:
"""Show output as spark DataFrame

:param show_output: String representing output of 'show' command in spark
:type show_output: str
:param spark: SparkSession object
:type spark: SparkSession
:return: DataFrame object containing output of a show command in spark
:rtype: Dataframe
"""
l = show_output.split("\n")
ugly_column_names = l[1]
pretty_column_names = [i.strip() for i in ugly_column_names[1:-1].split("|")]
Expand Down
34 changes: 34 additions & 0 deletions quinn/dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ class DataFrameProhibitedColumnError(ValueError):


def validate_presence_of_columns(df: DataFrame, required_col_names: List[str]) -> None:
"""Validates the presence of column names in a DataFrame.

:param df: A spark DataFrame.
:type df: DataFrame`
:param required_col_names: List of the required column names for the DataFrame.
:type required_col_names: :py:class:`list` of :py:class:`str`
:return: None.
:raises DataFrameMissingColumnError: if any of the requested column names are
not present in the DataFrame.
"""
all_col_names = df.columns
missing_col_names = [x for x in required_col_names if x not in all_col_names]
error_message = "The {missing_col_names} columns are not included in the DataFrame with the following columns {all_col_names}".format(
Expand All @@ -28,6 +38,21 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: List[str]) -


def validate_schema(df: DataFrame, required_schema: StructType, ignore_nullable: bool=False) -> None:
"""
This function will validate that a given DataFrame has a given StructType as its
schema.

:param df: DataFrame to validate
:type df: DataFrame
:param required_schema: StructType required for the DataFrame
:type required_schema: StructType
:param ignore_nullable: (Optional) A flag for if nullable fields should be
ignored during validation
:type ignore_nullable: bool, optional

:raises DataFrameMissingStructFieldError: if any StructFields from the required
schema are not included in the DataFrame schema
"""
_all_struct_fields = copy.deepcopy(df.schema)
_required_schema = copy.deepcopy(required_schema)

Expand All @@ -48,6 +73,15 @@ def validate_schema(df: DataFrame, required_schema: StructType, ignore_nullable:


def validate_absence_of_columns(df: DataFrame, prohibited_col_names: List[str]) -> None:
"""
Validate that none of the prohibited column names are present among
specified DataFrame columns.

:param df: DataFrame containing columns to be checked.
:param prohibited_col_names: List of prohibited column names.
:raises DataFrameProhibitedColumnError: If the prohibited column names are
present among the specified DataFrame columns.
"""
all_col_names = df.columns
extra_col_names = [x for x in all_col_names if x in prohibited_col_names]
error_message = "The {extra_col_names} columns are not allowed to be included in the DataFrame with the following columns {all_col_names}".format(
Expand Down
44 changes: 44 additions & 0 deletions quinn/extensions/column_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,66 @@


def isFalsy(self: Column) -> Column:
"""Returns a Column indicating whether all values in the Column are False or NULL
(**falsy**). Each element in the resulting column is True if all the elements in the
Column are either NULL or False, or False otherwise. This is accomplished by
performing a bitwise or of the ``isNull`` condition and a literal False value and
then wrapping the result in a **when** statement.

:param self: Column object
:returns: Column object
:rtype: Column
"""
return when(self.isNull() | (self == lit(False)), True).otherwise(False)


def isTruthy(self: Column) -> Column:
"""Calculates a boolean expression that is the opposite of isFalsy for the given
``Column`` self.

:param Column self: The ``Column`` to calculate the opposite of isFalsy for.
:returns: A ``Column`` with the results of the calculation.
:rtype: Column
"""
return ~(self.isFalsy())


def isFalse(self: Column) -> Column:
"""This function checks if the column is equal to False and returns the column.

:param self: Column
:return: Column
:rtype: Column
"""
return self == False


def isTrue(self: Column) -> Column:
"""
This function takes a column of type Column as an argument and returns a column
of type Column.

It evaluates whether each element in the column argument is equal to True, and
if so will return True, otherwise False.

:param self: Column object
:returns: Column object
:rtype: Column
"""
return self == True


def isNullOrBlank(self: Column) -> Column:
"""Returns a Boolean value which expresses whether a given column is ``null`` or
contains only blank characters.

:param \*\*self: The :class:`Column` to check.

:returns: A `Column` containing ``True`` if the column is ``null`` or only contains
blank characters, or ``False`` otherwise.
:rtype: Column
"""

return (self.isNull()) | (trim(self) == "")


Expand Down
11 changes: 11 additions & 0 deletions quinn/extensions/spark_session_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@


def create_df(self, rows_data, col_specs):
"""Creates a new DataFrame from the given data and column specs. The returned
DataFrame is created using the StructType and StructField classes provided by
PySpark.

:param rows_data: the data used to create the DataFrame
:type rows_data: array-like
:param col_specs: list of tuples containing the name and type of the field
:type col_specs: list of tuples
:return: a new DataFrame
:rtype: DataFrame
"""
struct_fields = list(map(lambda x: StructField(*x), col_specs))
return self.createDataFrame(data=rows_data, schema=StructType(struct_fields))

Expand Down
129 changes: 129 additions & 0 deletions quinn/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,129 @@


def single_space(col: Column) -> Column:
"""This function takes a column and replaces all the multiple white spaces with a
single space. It then trims the column to make all the texts consistent.
:param col: The column which needs to be spaced
:type col: Column
:returns: A trimmed column with single space
:rtype: Column
"""
return F.trim(F.regexp_replace(col, " +", " "))


def remove_all_whitespace(col: Column) -> Column:
"""This function takes a `Column` object as a parameter and returns a `Column` object
with all white space removed. It does this using the regexp_replace function
from F, which replaces all whitespace with an empty string.
:param col: a `Column` object
:type col: Column
:returns: a `Column` object with all white space removed
:rtype: Column
"""
return F.regexp_replace(col, "\\s+", "")


def anti_trim(col: Column) -> Column:
"""Removes whitespace from the boundaries of ``col`` using the regexp_replace
function.

:param col: Column on which to perform the regexp_replace.
:type col: Column
:return: A new Column with all whitespace removed from the boundaries.
:rtype: Column
"""
return F.regexp_replace(col, "\\b\\s+\\b", "")


def remove_non_word_characters(col: Column) -> Column:
"""Removes non-word characters from a column.

The non-word characters which will be removed are those identified by the
regular expression ``"[^\\w\\s]+"``. This expression represents any character
that is not a word character (e.g. `\w`) or whitespace (`\s`).

:param col: A Column object.
:return: A Column object with non-word characters removed.

"""
return F.regexp_replace(col, "[^\\w\\s]+", "")


def exists(f: Callable[[Any], bool]):
"""
Create a user-defined function that takes a list expressed as a column of
type ``ArrayType(AnyType)`` as an argument and returns a boolean value indicating
whether any element in the list is true according to the argument ``f`` of the
``exists()`` function.

:param f: Callable function - A callable function that takes an element of
type Any and returns a boolean value.
:return: A user-defined function that takes
a list expressed as a column of type ArrayType(AnyType) as an argument and
returns a boolean value indicating whether any element in the list is true
according to the argument ``f`` of the ``exists()`` function.
:rtype: UserDefinedFunction
"""

def temp_udf(l):
return any(map(f, l))

return F.udf(temp_udf, BooleanType())


def forall(f: Callable[[Any], bool]):
"""The **forall** function allows for mapping a given boolean function to a list of
arguments and return a single boolean value as the result of applying the
boolean function to each element of the list. It does this by creating a Spark
UDF which takes in a list of arguments, applying the given boolean function to
each element of the list and returning a single boolean value if all the
elements pass through the given boolean function.

:param f: A callable function ``f`` which takes in any type and returns a boolean
:return: A spark UDF which accepts a list of arguments and returns True if all
elements pass through the given boolean function, False otherwise.
:rtype: UserDefinedFunction
"""

def temp_udf(l):
return all(map(f, l))

return F.udf(temp_udf, BooleanType())


def multi_equals(value: Any):
"""Create a user-defined function that checks if all the given columns have the
designated value.

:param value: The designated value.
:type value: Any
:return: A user-defined function of type BooleanType().
:rtype: UserDifinedFunction
"""

def temp_udf(*cols):
return all(map(lambda col: col == value, cols))

return F.udf(temp_udf, BooleanType())


def week_start_date(col: Column, week_start_day: str = "Sun") -> Column:
"""This function takes a Spark `Column` and an optional `week_start_day` string
argument and returns a `Column` with the corresponding start of week dates. The
"standard week" in Spark starts on Sunday, however an optional argument can be
used to start the week from a different day, e.g. Monday. The `week_start_day`
argument is a string corresponding to the day of the week to start the week
from, e.g. `"Mon"`, `"Tue"`, and must be in the set: `{"Sun", "Mon", "Tue", "Wed",
"Thu", "Fri", "Sat"}`. If the argument given is not a valid day then a `ValueError`
will be raised.

:param col: The column to determine start of week dates on
:type col: Column
:param week_start_day: The day to start the week on
:type week_start_day: str
:returns: A Column with start of week dates
:rtype: Column
"""
_raise_if_invalid_day(week_start_day)
# the "standard week" in Spark is from Sunday to Saturday
mapping = {
Expand All @@ -61,6 +147,22 @@ def week_start_date(col: Column, week_start_day: str = "Sun") -> Column:


def week_end_date(col: Column, week_end_day: str = "Sat") -> Column:
"""
Returns a date column for the end of week for a given day.

The Spark function `dayofweek` considers Sunday as the first day of the week, and
uses the default value of 1 to indicate Sunday. Usage of the `when` and `otherwise`
functions allow a comparison between the end of week day indicated and the day
of week computed, and the return of the reference date if they match or the the
addition of one week to the reference date otherwise.

:param col: The reference date column.
:type col: Column
:param week_end_day: The week end day (default: 'Sat')
:type week_end_day: str
:return: A Column of end of the week dates.
:rtype: Column
"""
_raise_if_invalid_day(week_end_day)
# these are the default Spark mappings. Spark considers Sunday the first day of the week.
day_of_week_mapping = {
Expand All @@ -87,14 +189,41 @@ def _raise_if_invalid_day(day: str) -> None:


def approx_equal(col1: Column, col2: Column, threshhold: Number) -> Column:
"""Compares two ``Column`` objects by checking if the difference between them
is less than a specified ``threshhold``.

:param col1: the first ``Column``
:type col1: Column
:param col2: the second ``Column``
:type col2: Column
:param threshhold: value to compare with
:type threshhold: Number
:return: Boolean ``Column`` with ``True`` indicating that ``abs(col1 -
col2)`` is less than ``threshhold``
"""
return F.abs(col1 - col2) < threshhold


def array_choice(col: Column) -> Column:
"""Returns one random element from the given column.

:param col: Column from which element is chosen
:type col: Column
:return: random element from the given column
:rtype: Column
"""
index = (F.rand() * F.size(col)).cast("int")
return col[index]


@F.udf(returnType=ArrayType(StringType()))
def regexp_extract_all(s: str, regexp: str) -> Optional[List[re.Match]]:
"""This function uses the Python `re` library to extract regular expressions from a
string (`s`) using a regex pattern (`regexp`). It returns a list of all matches, or `None` if `s` is `None`.

:param s: input string (`Column`)
:type s: str
:param regexp: string `re` pattern
:return: List of matches
"""
return None if s == None else re.findall(regexp, s)
Loading