Skip to content

Commit

Permalink
[0.2.0] Metadata Implementation (#81)
Browse files Browse the repository at this point in the history
* Update file structure

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update single_table.py

* Update metadata (still draft)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update metadata and reset file structure

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add numeric inspector

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix metadata initialization

* fix type hits error in py38

* add inspectors

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update relationship.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update multi-table-combiner

* add composite key list in relationship

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Apply suggestions from code review

Use a single list for single or composite primary key.

Co-authored-by: Zhongsheng Ji <9573586@qq.com>

* Apply suggestions from code review

Key are described using list, which is compatible with single or composite foreign key.

Co-authored-by: Zhongsheng Ji <9573586@qq.com>

* use a single list for primary key(s)

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update expections

* Update check functions

Unit testing also needs to be implemented.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Apply suggestions from code review

Co-authored-by: Zhongsheng Ji <9573586@qq.com>

* Apply suggestions from code review

Unit testing also needs to be implemented.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update some test case

still some cases not completed yet.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update test cases

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update metadata save and load test cases.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Zhongsheng Ji <9573586@qq.com>
  • Loading branch information
3 people committed Dec 27, 2023
1 parent 897e252 commit 9b4c683
Show file tree
Hide file tree
Showing 13 changed files with 673 additions and 13 deletions.
39 changes: 39 additions & 0 deletions sdgx/data_models/inspectors/bool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from typing import Any

import pandas as pd
from pandas._libs.tslibs.parsing import DateParseError

from sdgx.data_models.inspectors.base import Inspector
from sdgx.data_models.inspectors.extension import hookimpl


class BoolInspector(Inspector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bool_columns: set[str] = set()

def fit(self, raw_data: pd.DataFrame):
"""Fit the inspector.
Gets the list of discrete columns from the raw data.
Args:
raw_data (pd.DataFrame): Raw data
"""
self.bool_columns = self.bool_columns.union(
set(raw_data.infer_objects().select_dtypes(include=["bool"]).columns)
)

self.ready = True

def inspect(self) -> dict[str, Any]:
"""Inspect raw data and generate metadata."""

return {"bool_columns": list(self.bool_columns)}


@hookimpl
def register(manager):
manager.register("BoolInspector", BoolInspector)
63 changes: 63 additions & 0 deletions sdgx/data_models/inspectors/datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from typing import Any

import pandas as pd
from pandas._libs.tslibs.parsing import DateParseError

from sdgx.data_models.inspectors.base import Inspector
from sdgx.data_models.inspectors.extension import hookimpl


class DatetimeInspector(Inspector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.datetime_columns: set[str] = set()

@classmethod
def can_convert_to_datetime(cls, input_col: pd.Series):
"""Whether a df column can be converted to datetime.
Args:
input_col(pd.Series): A column of a dataframe.
"""
try:
pd.to_datetime(input_col)
return True
except DateParseError:
return False
# for other situations
except:
return False

def fit(self, raw_data: pd.DataFrame):
"""Fit the inspector.
Gets the list of discrete columns from the raw data.
Args:
raw_data (pd.DataFrame): Raw data
"""
self.datetime_columns = self.datetime_columns.union(
set(raw_data.infer_objects().select_dtypes(include=["datetime64"]).columns)
)

# for some other case
# Some columns containing dates after infer are still marked as object
candidate_columns = set(raw_data.select_dtypes(include=["object"]).columns)
for col_name in candidate_columns:
each_col = raw_data[col_name]
if DatetimeInspector.can_convert_to_datetime(each_col):
self.datetime_columns.add(col_name)

self.ready = True

def inspect(self) -> dict[str, Any]:
"""Inspect raw data and generate metadata."""

return {"datetime_columns": list(self.datetime_columns)}


@hookimpl
def register(manager):
manager.register("DatetimeInspector", DatetimeInspector)
44 changes: 44 additions & 0 deletions sdgx/data_models/inspectors/id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from typing import Any

import pandas as pd

from sdgx.data_models.inspectors.base import Inspector
from sdgx.data_models.inspectors.extension import hookimpl


class IDInspector(Inspector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ID_columns: set[str] = set()

def fit(self, raw_data: pd.DataFrame):
"""Fit the inspector.
Gets the list of discrete columns from the raw data.
Args:
raw_data (pd.DataFrame): Raw data
"""

df_length = len(raw_data)
candidate_columns = set(raw_data.select_dtypes(include=["object", "int64"]).columns)

for each_col_name in candidate_columns:
target_col = raw_data[each_col_name]
col_set_length = len(set(target_col))
if col_set_length == df_length:
self.ID_columns.add(each_col_name)

self.ready = True

def inspect(self) -> dict[str, Any]:
"""Inspect raw data and generate metadata."""

return {"id_columns": list(self.ID_columns)}


@hookimpl
def register(manager):
manager.register("IDInspector", IDInspector)
38 changes: 38 additions & 0 deletions sdgx/data_models/inspectors/numeric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from typing import Any

import pandas as pd

from sdgx.data_models.inspectors.base import Inspector
from sdgx.data_models.inspectors.extension import hookimpl


class NumericInspector(Inspector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.numeric_columns: set[str] = set()

def fit(self, raw_data: pd.DataFrame):
"""Fit the inspector.
Gets the list of discrete columns from the raw data.
Args:
raw_data (pd.DataFrame): Raw data
"""

self.numeric_columns = self.numeric_columns.union(
set(raw_data.select_dtypes(include=["float64", "int64"]).columns)
)
self.ready = True

def inspect(self) -> dict[str, Any]:
"""Inspect raw data and generate metadata."""

return {"numeric_columns": list(self.numeric_columns)}


@hookimpl
def register(manager):
manager.register("NumericInspector", NumericInspector)
144 changes: 131 additions & 13 deletions sdgx/data_models/metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import json
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List

Expand All @@ -10,23 +9,42 @@

from sdgx.data_loader import DataLoader
from sdgx.data_models.inspectors.manager import InspectorManager
from sdgx.exceptions import MetadataInitError
from sdgx.exceptions import MetadataInitError, MetadataInvalidError
from sdgx.utils import logger

# TODO: Design metadata for relationships...
# class DType(Enum):
# datetime = "datetime"
# timestamp = "timestamp"
# numeric = "numeric"
# category = "category"

class Metadata(BaseModel):
"""Metadata
# class Relationship:
# pass
This metadata is mainly used to describe the data types of all columns in a single data table.
For each column, there should be an instance of the Data Type object.
class Metadata(BaseModel):
Args:
primary_keys(List[str]): The primary key, a field used to uniquely identify each row in the table.
The primary key of each row must be unique and not empty.
column_list(list[str]): list of the comlumn name in the table, other columns lists are used to store column information.
"""

# for primary key
# compatible with single primary key or composite primary key
primary_keys: List[str] = []

# variables related to columns
# column_list is used to store all columns' name
column_list: List[str] = []

# other columns lists are used to store column information
# here are 5 basic data types
id_columns: List[str] = []
numeric_columns: List[str] = []
bool_columns: List[str] = []
discrete_columns: List[str] = []
datetime_columns: List[str] = []

# version info
metadata_version: str = "1.0"
_extend: Dict[str, Any] = {}

def get(self, key: str, default=None) -> Any:
Expand All @@ -52,10 +70,27 @@ def from_dataloader(
cls,
dataloader: DataLoader,
max_chunk: int = 10,
primary_keys: List[str] = None,
include_inspectors: list[str] | None = None,
exclude_inspectors: list[str] | None = None,
inspector_init_kwargs: dict[str, Any] | None = None,
) -> "Metadata":
"""Initialize a metadata from DataLoader and Inspectors
Args:
dataloader(DataLoader): the input DataLoader.
max_chunk(int): max chunk count.
primary_key(list(str) | str): the primary key of this table.
Use the first column in table by default.
include_inspectors(list[str]): data type inspectors that should included in this metadata (table).
exclude_inspectors(list[str]): data type inspectors that should NOT included in this metadata (table).
inspector_init_kwargs(dict): inspector args.
"""
logger.info("Inspecting metadata...")
inspectors = InspectorManager().init_inspcetors(
include_inspectors, exclude_inspectors, **(inspector_init_kwargs or {})
Expand All @@ -66,7 +101,11 @@ def from_dataloader(
if all(i.ready for i in inspectors) or i > max_chunk:
break

metadata = Metadata()
# If primary_key is not specified, use the first column (in list).
if primary_keys is None:
primary_keys = [dataloader.columns()[0]]

metadata = Metadata(primary_keys=primary_keys, column_list=dataloader.columns())
for inspector in inspectors:
metadata.update(inspector.inspect())

Expand All @@ -86,7 +125,7 @@ def from_dataframe(
for inspector in inspectors:
inspector.fit(df)

metadata = Metadata()
metadata = Metadata(primary_keys=[df.columns[0]], column_list=list(df.columns))
for inspector in inspectors:
metadata.update(inspector.inspect())

Expand All @@ -101,3 +140,82 @@ def load(cls, path: str | Path) -> "Metadata":
path = Path(path).expanduser().resolve()
attributes = json.load(path.open("r"))
return Metadata().update(attributes)

def check_single_primary_key(self, input_key: str):
"""Check whether a primary key in column_list and has ID data type.
Args:
input_key(str): the input primary_key str
"""

if input_key not in self.column_list:
raise MetadataInvalidError(f"Primary Key {input_key} not Exist in columns.")
if input_key not in self.id_columns:
raise MetadataInvalidError(f"Primary Key {input_key} should has ID DataType.")

def get_all_data_type_columns(self):
"""Get all column names from `self.xxx_columns`.
All Lists with the suffix _columns in model fields and extend fields need to be collected.
All defined column names will be counted.
Returns:
all_dtype_cols(set): set of all column names.
"""
all_dtype_cols = set()

# search the model fields and extend fields
for each_key in list(self.model_fields.keys()) + list(self._extend.keys()):
if each_key.endswith("_columns"):
column_names = self.get(each_key)
all_dtype_cols = all_dtype_cols.union(set(column_names))

return all_dtype_cols

def check(self):
"""Checks column info.
When passing as input to the next module, perform necessary checks, including:
-Is the primary key correctly defined(in column list) and has ID data type.
-Is there any missing definition of each column in table.
-Are there any unknown columns that have been incorrectly updated.
"""
# check primary key in column_list and has ID data type
for each_key in self.primary_keys:
self.check_single_primary_key(each_key)

all_dtype_columns = self.get_all_data_type_columns()

# check missing columns
if set(self.column_list) - set(all_dtype_columns):
raise MetadataInvalidError(
f"Undefined data type for column {set(self.column_list) - set(all_dtype_columns)}."
)

# check unfamiliar columns in dtypes
if set(all_dtype_columns) - set(self.column_list):
raise MetadataInvalidError(
f"Found undefined column: {set(all_dtype_columns) - set(self.column_list)}."
)

logger.debug("Metadata check succeed.")

def update_primary_key(self, primary_keys: List[str]):
"""Update the primary key of the table
When update the primary key, the original primary key will be erased.
Args:
primary_keys(List[str]): the primary keys of this table.
"""

if not isinstance(primary_keys, List):
raise MetadataInvalidError("Primary key should be a list.")

for each_key in primary_keys:
if each_key not in self.column_list:
raise MetadataInvalidError("Primary key not exist in table columns.")

self.primary_keys = primary_keys

logger.info(f"Primary Key updated: {primary_keys}.")
Loading

0 comments on commit 9b4c683

Please sign in to comment.