From 7e17c4d361caeb7a53336dcb7305f3048241c522 Mon Sep 17 00:00:00 2001 From: Sem Sinchenko Date: Thu, 6 Apr 2023 13:58:21 +0200 Subject: [PATCH] Docstrings to public methods and functions. Created with OpenAI davinchi-text-003 and validated/fixed by myself. On branch feature/90-docstrings_public_functions Changes to be committed: - modified: quinn/dataframe_helpers.py - modified: quinn/dataframe_validator.py - modified: quinn/extensions/column_ext.py - modified: quinn/extensions/spark_session_ext.py - modified: quinn/functions.py - modified: quinn/transformations.py --- quinn/dataframe_helpers.py | 23 +++++ quinn/dataframe_validator.py | 34 +++++++ quinn/extensions/column_ext.py | 44 +++++++++ quinn/extensions/spark_session_ext.py | 11 +++ quinn/functions.py | 129 ++++++++++++++++++++++++++ quinn/transformations.py | 55 +++++++++++ 6 files changed, 296 insertions(+) diff --git a/quinn/dataframe_helpers.py b/quinn/dataframe_helpers.py index 27557058..ae3b4884 100644 --- a/quinn/dataframe_helpers.py +++ b/quinn/dataframe_helpers.py @@ -35,12 +35,26 @@ def two_columns_to_dictionary( def to_list_of_dictionaries(df: DataFrame) -> List[Dict[str, Any]]: + """Convert a Spark DataFrame to a list of dictionaries. + + :param df: The Spark DataFrame to convert. + :type df: :py:class:`pyspark.sql.DataFrame` + :return: A list of dictionaries representing the rows in the DataFrame. + :rtype: List[Dict[str, Any]] + """ return list(map(lambda r: r.asDict(), df.collect())) def print_athena_create_table( df: DataFrame, athena_table_name: str, s3location: str ) -> None: + """Generates the Athena create table statement for a given DataFrame + + :param df: The pyspark.sql.DataFrame to use + :param athena_table_name: The name of the athena table to generate + :param s3location: The S3 location of the parquet data + :return: None + """ fields = df.schema print(f"CREATE EXTERNAL TABLE IF NOT EXISTS `{athena_table_name}` ( ") @@ -56,6 +70,15 @@ def print_athena_create_table( def show_output_to_df(show_output: str, spark: SparkSession) -> DataFrame: + """Show output as spark DataFrame + + :param show_output: String representing output of 'show' command in spark + :type show_output: str + :param spark: SparkSession object + :type spark: SparkSession + :return: DataFrame object containing output of a show command in spark + :rtype: Dataframe + """ l = show_output.split("\n") ugly_column_names = l[1] pretty_column_names = [i.strip() for i in ugly_column_names[1:-1].split("|")] diff --git a/quinn/dataframe_validator.py b/quinn/dataframe_validator.py index 5194d59d..3295f697 100644 --- a/quinn/dataframe_validator.py +++ b/quinn/dataframe_validator.py @@ -18,6 +18,16 @@ class DataFrameProhibitedColumnError(ValueError): def validate_presence_of_columns(df: DataFrame, required_col_names: List[str]) -> None: + """Validates the presence of column names in a DataFrame. + + :param df: A spark DataFrame. + :type df: DataFrame` + :param required_col_names: List of the required column names for the DataFrame. + :type required_col_names: :py:class:`list` of :py:class:`str` + :return: None. + :raises DataFrameMissingColumnError: if any of the requested column names are + not present in the DataFrame. + """ all_col_names = df.columns missing_col_names = [x for x in required_col_names if x not in all_col_names] error_message = "The {missing_col_names} columns are not included in the DataFrame with the following columns {all_col_names}".format( @@ -28,6 +38,21 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: List[str]) - def validate_schema(df: DataFrame, required_schema: StructType, ignore_nullable: bool=False) -> None: + """ + This function will validate that a given DataFrame has a given StructType as its + schema. + + :param df: DataFrame to validate + :type df: DataFrame + :param required_schema: StructType required for the DataFrame + :type required_schema: StructType + :param ignore_nullable: (Optional) A flag for if nullable fields should be + ignored during validation + :type ignore_nullable: bool, optional + + :raises DataFrameMissingStructFieldError: if any StructFields from the required + schema are not included in the DataFrame schema + """ _all_struct_fields = copy.deepcopy(df.schema) _required_schema = copy.deepcopy(required_schema) @@ -48,6 +73,15 @@ def validate_schema(df: DataFrame, required_schema: StructType, ignore_nullable: def validate_absence_of_columns(df: DataFrame, prohibited_col_names: List[str]) -> None: + """ + Validate that none of the prohibited column names are present among + specified DataFrame columns. + + :param df: DataFrame containing columns to be checked. + :param prohibited_col_names: List of prohibited column names. + :raises DataFrameProhibitedColumnError: If the prohibited column names are + present among the specified DataFrame columns. + """ all_col_names = df.columns extra_col_names = [x for x in all_col_names if x in prohibited_col_names] error_message = "The {extra_col_names} columns are not allowed to be included in the DataFrame with the following columns {all_col_names}".format( diff --git a/quinn/extensions/column_ext.py b/quinn/extensions/column_ext.py index 0b6693ef..73dfb5c4 100644 --- a/quinn/extensions/column_ext.py +++ b/quinn/extensions/column_ext.py @@ -5,22 +5,66 @@ def isFalsy(self: Column) -> Column: + """Returns a Column indicating whether all values in the Column are False or NULL + (**falsy**). Each element in the resulting column is True if all the elements in the + Column are either NULL or False, or False otherwise. This is accomplished by + performing a bitwise or of the ``isNull`` condition and a literal False value and + then wrapping the result in a **when** statement. + + :param self: Column object + :returns: Column object + :rtype: Column + """ return when(self.isNull() | (self == lit(False)), True).otherwise(False) def isTruthy(self: Column) -> Column: + """Calculates a boolean expression that is the opposite of isFalsy for the given + ``Column`` self. + + :param Column self: The ``Column`` to calculate the opposite of isFalsy for. + :returns: A ``Column`` with the results of the calculation. + :rtype: Column + """ return ~(self.isFalsy()) def isFalse(self: Column) -> Column: + """This function checks if the column is equal to False and returns the column. + + :param self: Column + :return: Column + :rtype: Column + """ return self == False def isTrue(self: Column) -> Column: + """ + This function takes a column of type Column as an argument and returns a column + of type Column. + + It evaluates whether each element in the column argument is equal to True, and + if so will return True, otherwise False. + + :param self: Column object + :returns: Column object + :rtype: Column + """ return self == True def isNullOrBlank(self: Column) -> Column: + """Returns a Boolean value which expresses whether a given column is ``null`` or + contains only blank characters. + + :param \*\*self: The :class:`Column` to check. + + :returns: A `Column` containing ``True`` if the column is ``null`` or only contains + blank characters, or ``False`` otherwise. + :rtype: Column + """ + return (self.isNull()) | (trim(self) == "") diff --git a/quinn/extensions/spark_session_ext.py b/quinn/extensions/spark_session_ext.py index 1fea90bc..234af4d4 100644 --- a/quinn/extensions/spark_session_ext.py +++ b/quinn/extensions/spark_session_ext.py @@ -3,6 +3,17 @@ def create_df(self, rows_data, col_specs): + """Creates a new DataFrame from the given data and column specs. The returned + DataFrame is created using the StructType and StructField classes provided by + PySpark. + + :param rows_data: the data used to create the DataFrame + :type rows_data: array-like + :param col_specs: list of tuples containing the name and type of the field + :type col_specs: list of tuples + :return: a new DataFrame + :rtype: DataFrame + """ struct_fields = list(map(lambda x: StructField(*x), col_specs)) return self.createDataFrame(data=rows_data, schema=StructType(struct_fields)) diff --git a/quinn/functions.py b/quinn/functions.py index d914120f..0560d1cc 100644 --- a/quinn/functions.py +++ b/quinn/functions.py @@ -8,22 +8,70 @@ def single_space(col: Column) -> Column: + """This function takes a column and replaces all the multiple white spaces with a + single space. It then trims the column to make all the texts consistent. + :param col: The column which needs to be spaced + :type col: Column + :returns: A trimmed column with single space + :rtype: Column + """ return F.trim(F.regexp_replace(col, " +", " ")) def remove_all_whitespace(col: Column) -> Column: + """This function takes a `Column` object as a parameter and returns a `Column` object + with all white space removed. It does this using the regexp_replace function + from F, which replaces all whitespace with an empty string. + :param col: a `Column` object + :type col: Column + :returns: a `Column` object with all white space removed + :rtype: Column + """ return F.regexp_replace(col, "\\s+", "") def anti_trim(col: Column) -> Column: + """Removes whitespace from the boundaries of ``col`` using the regexp_replace + function. + + :param col: Column on which to perform the regexp_replace. + :type col: Column + :return: A new Column with all whitespace removed from the boundaries. + :rtype: Column + """ return F.regexp_replace(col, "\\b\\s+\\b", "") def remove_non_word_characters(col: Column) -> Column: + """Removes non-word characters from a column. + + The non-word characters which will be removed are those identified by the + regular expression ``"[^\\w\\s]+"``. This expression represents any character + that is not a word character (e.g. `\w`) or whitespace (`\s`). + + :param col: A Column object. + :return: A Column object with non-word characters removed. + + """ return F.regexp_replace(col, "[^\\w\\s]+", "") def exists(f: Callable[[Any], bool]): + """ + Create a user-defined function that takes a list expressed as a column of + type ``ArrayType(AnyType)`` as an argument and returns a boolean value indicating + whether any element in the list is true according to the argument ``f`` of the + ``exists()`` function. + + :param f: Callable function - A callable function that takes an element of + type Any and returns a boolean value. + :return: A user-defined function that takes + a list expressed as a column of type ArrayType(AnyType) as an argument and + returns a boolean value indicating whether any element in the list is true + according to the argument ``f`` of the ``exists()`` function. + :rtype: UserDefinedFunction + """ + def temp_udf(l): return any(map(f, l)) @@ -31,6 +79,19 @@ def temp_udf(l): def forall(f: Callable[[Any], bool]): + """The **forall** function allows for mapping a given boolean function to a list of + arguments and return a single boolean value as the result of applying the + boolean function to each element of the list. It does this by creating a Spark + UDF which takes in a list of arguments, applying the given boolean function to + each element of the list and returning a single boolean value if all the + elements pass through the given boolean function. + + :param f: A callable function ``f`` which takes in any type and returns a boolean + :return: A spark UDF which accepts a list of arguments and returns True if all + elements pass through the given boolean function, False otherwise. + :rtype: UserDefinedFunction + """ + def temp_udf(l): return all(map(f, l)) @@ -38,6 +99,15 @@ def temp_udf(l): def multi_equals(value: Any): + """Create a user-defined function that checks if all the given columns have the + designated value. + + :param value: The designated value. + :type value: Any + :return: A user-defined function of type BooleanType(). + :rtype: UserDifinedFunction + """ + def temp_udf(*cols): return all(map(lambda col: col == value, cols)) @@ -45,6 +115,22 @@ def temp_udf(*cols): def week_start_date(col: Column, week_start_day: str = "Sun") -> Column: + """This function takes a Spark `Column` and an optional `week_start_day` string + argument and returns a `Column` with the corresponding start of week dates. The + "standard week" in Spark starts on Sunday, however an optional argument can be + used to start the week from a different day, e.g. Monday. The `week_start_day` + argument is a string corresponding to the day of the week to start the week + from, e.g. `"Mon"`, `"Tue"`, and must be in the set: `{"Sun", "Mon", "Tue", "Wed", + "Thu", "Fri", "Sat"}`. If the argument given is not a valid day then a `ValueError` + will be raised. + + :param col: The column to determine start of week dates on + :type col: Column + :param week_start_day: The day to start the week on + :type week_start_day: str + :returns: A Column with start of week dates + :rtype: Column + """ _raise_if_invalid_day(week_start_day) # the "standard week" in Spark is from Sunday to Saturday mapping = { @@ -61,6 +147,22 @@ def week_start_date(col: Column, week_start_day: str = "Sun") -> Column: def week_end_date(col: Column, week_end_day: str = "Sat") -> Column: + """ + Returns a date column for the end of week for a given day. + + The Spark function `dayofweek` considers Sunday as the first day of the week, and + uses the default value of 1 to indicate Sunday. Usage of the `when` and `otherwise` + functions allow a comparison between the end of week day indicated and the day + of week computed, and the return of the reference date if they match or the the + addition of one week to the reference date otherwise. + + :param col: The reference date column. + :type col: Column + :param week_end_day: The week end day (default: 'Sat') + :type week_end_day: str + :return: A Column of end of the week dates. + :rtype: Column + """ _raise_if_invalid_day(week_end_day) # these are the default Spark mappings. Spark considers Sunday the first day of the week. day_of_week_mapping = { @@ -87,14 +189,41 @@ def _raise_if_invalid_day(day: str) -> None: def approx_equal(col1: Column, col2: Column, threshhold: Number) -> Column: + """Compares two ``Column`` objects by checking if the difference between them + is less than a specified ``threshhold``. + + :param col1: the first ``Column`` + :type col1: Column + :param col2: the second ``Column`` + :type col2: Column + :param threshhold: value to compare with + :type threshhold: Number + :return: Boolean ``Column`` with ``True`` indicating that ``abs(col1 - + col2)`` is less than ``threshhold`` + """ return F.abs(col1 - col2) < threshhold def array_choice(col: Column) -> Column: + """Returns one random element from the given column. + + :param col: Column from which element is chosen + :type col: Column + :return: random element from the given column + :rtype: Column + """ index = (F.rand() * F.size(col)).cast("int") return col[index] @F.udf(returnType=ArrayType(StringType())) def regexp_extract_all(s: str, regexp: str) -> Optional[List[re.Match]]: + """This function uses the Python `re` library to extract regular expressions from a + string (`s`) using a regex pattern (`regexp`). It returns a list of all matches, or `None` if `s` is `None`. + + :param s: input string (`Column`) + :type s: str + :param regexp: string `re` pattern + :return: List of matches + """ return None if s == None else re.findall(regexp, s) diff --git a/quinn/transformations.py b/quinn/transformations.py index 779b7ebf..2a9a6ee9 100644 --- a/quinn/transformations.py +++ b/quinn/transformations.py @@ -5,6 +5,19 @@ def with_columns_renamed(fun: Callable[[str], str]) -> Callable[[DataFrame], DataFrame]: + """This is a function designed to rename the columns of a + `Spark DataFrame`. + + It takes a `Callable[[str], str]` object as an argument (``fun``) and returns a + `Callable[[DataFrame], DataFrame]` object. + + When `_()` is called on a `DataFrame`, it creates a list of column names, + applying the argument `fun()` to each of them, and returning a new `DataFrame` + with the new column names. + + :param fun: Renaming function + :returns: Function which takes DataFrame as parameter. + """ def _(df: DataFrame) -> DataFrame: cols = list( map( @@ -20,6 +33,20 @@ def _(df: DataFrame) -> DataFrame: def with_some_columns_renamed( fun: Callable[[str], str], change_col_name: Callable[[str], str] ) -> Callable[[DataFrame], DataFrame]: + """A function that takes a `Callable[[str], str]` and a `Callable[[str], str]` + and returns a `Callable[[DataFrame], DataFrame]`, which in turn takes a + `DataFrame` and returns a `DataFrame` with some of its columns renamed. + + :param fun: A function that takes a column name as a string and returns a + new name as a string. + :type fun: `Callable[[str], str]` + :param change_col_name: A function that takes a column name as a string and + returns a boolean. + :type change_col_name: `Callable[[str], str]` + :return: A `Callable[[DataFrame], DataFrame]`, which takes a + `DataFrame` and returns a `DataFrame` with some of its columns renamed. + :rtype: `Callable[[DataFrame], DataFrame]` + """ def _(df): cols = list( map( @@ -35,14 +62,42 @@ def _(df): def snake_case_col_names(df: DataFrame) -> DataFrame: + """This function takes a ``DataFrame`` instance and returns the + same ``DataFrame`` instance with all column names converted to snake case + (e.g. ``col_name_1``). It uses the ``to_snake_case`` function in conjunction with + the ``with_columns_renamed`` function to achieve this. + :param df: A ``DataFrame`` instance to process + :type df: ``DataFrame`` + :return: A ``DataFrame`` instance with column names converted to snake case + :rtype: ``DataFrame`` + """ return with_columns_renamed(to_snake_case)(df) def to_snake_case(s: str) -> str: + """Takes a string and converts it to snake case format. + + :param s: The string to be converted. + :type s: str + :return: The string in snake case format. + :rtype: str + """ return s.lower().replace(" ", "_") def sort_columns(df: DataFrame, sort_order: str) -> DataFrame: + """This function sorts the columns of a given DataFrame based on a given sort + order. The ``sort_order`` parameter can either be ``asc`` or ``desc``, which correspond to + ascending and descending order, respectively. If any other value is provided for + the ``sort_order`` parameter, a ``ValueError`` will be raised. + + :param df: A DataFrame + :type df: pandas.DataFrame + :param sort_order: The order in which to sort the columns in the DataFrame + :type sort_order: str + :return: A DataFrame with the columns sorted in the chosen order + :rtype: pandas.DataFrame + """ sorted_col_names = None if sort_order == "asc": sorted_col_names = sorted(df.columns)