Skip to content

Commit

Permalink
feat: add dbt-athena adapter support for column types mapping (datahu…
Browse files Browse the repository at this point in the history
…b-project#8116)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
svdimchenko and hsheth2 committed May 27, 2023
1 parent e14d7ed commit 6adb496
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
3 changes: 3 additions & 0 deletions metadata-ingestion/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ coverage.xml
.pytest_cache/
junit.*xml

# Integration tests artifacts
tests/integrations/

# Translations
*.mo
*.pot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.sql.sql_types import (
ATHENA_SQL_TYPES_MAP,
BIGQUERY_TYPES_MAP,
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
SPARK_SQL_TYPES_MAP,
TRINO_SQL_TYPES_MAP,
VERTICA_SQL_TYPES_MAP,
resolve_athena_modified_type,
resolve_postgres_modified_type,
resolve_trino_modified_type,
resolve_vertica_modified_type,
Expand Down Expand Up @@ -504,6 +506,7 @@ def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage:
**BIGQUERY_TYPES_MAP,
**SPARK_SQL_TYPES_MAP,
**TRINO_SQL_TYPES_MAP,
**ATHENA_SQL_TYPES_MAP,
**VERTICA_SQL_TYPES_MAP,
}

Expand All @@ -517,9 +520,11 @@ def get_column_type(
TypeClass: Any = _field_type_mapping.get(column_type)

if TypeClass is None:
# resolve modified type
# resolve a modified type
if dbt_adapter == "trino":
TypeClass = resolve_trino_modified_type(column_type)
elif dbt_adapter == "athena":
TypeClass = resolve_athena_modified_type(column_type)
elif dbt_adapter == "postgres" or dbt_adapter == "redshift":
# Redshift uses a variant of Postgres, so we can use the same logic.
TypeClass = resolve_postgres_modified_type(column_type)
Expand Down
37 changes: 33 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,16 @@ def resolve_trino_modified_type(type_string: str) -> Any:
if match:
modified_type_base: str = match.group(1)
return TRINO_SQL_TYPES_MAP[modified_type_base]
else:
return TRINO_SQL_TYPES_MAP[type_string]
return TRINO_SQL_TYPES_MAP[type_string]


def resolve_athena_modified_type(type_string: str) -> Any:
# for cases like timestamp(3), decimal(10,0)
match = re.match(r"([a-zA-Z]+)\(.+\)", type_string)
if match:
modified_type_base: str = match.group(1)
return ATHENA_SQL_TYPES_MAP[modified_type_base]
return ATHENA_SQL_TYPES_MAP[type_string]


def resolve_vertica_modified_type(type_string: str) -> Any:
Expand All @@ -243,8 +251,7 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
if match:
modified_type_base: str = match.group(1)
return VERTICA_SQL_TYPES_MAP[modified_type_base]
else:
return VERTICA_SQL_TYPES_MAP[type_string]
return VERTICA_SQL_TYPES_MAP[type_string]


# see https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
Expand Down Expand Up @@ -353,6 +360,28 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
"array": ArrayType,
}

# https://docs.aws.amazon.com/athena/latest/ug/data-types.html
# https://github.com/dbt-athena/dbt-athena/tree/main
ATHENA_SQL_TYPES_MAP: Dict[str, Any] = {
"boolean": BooleanType,
"tinyint": NumberType,
"smallint": NumberType,
"int": NumberType,
"integer": NumberType,
"bigint": NumberType,
"float": NumberType,
"double": NumberType,
"decimal": NumberType,
"varchar": StringType,
"char": StringType,
"binary": BytesType,
"date": DateType,
"timestamp": TimeType,
"struct": RecordType,
"map": MapType,
"array": ArrayType,
}

# https://www.vertica.com/docs/11.1.x/HTML/Content/Authoring/SQLReferenceManual/DataTypes/SQLDataTypes.htm
VERTICA_SQL_TYPES_MAP: Dict[str, Any] = {
"binary": BytesType,
Expand Down
32 changes: 32 additions & 0 deletions metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from datahub.ingestion.source.dbt.dbt_common import DBTEntitiesEnabled, EmitDirective
from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig
from datahub.ingestion.source.sql.sql_types import (
ATHENA_SQL_TYPES_MAP,
TRINO_SQL_TYPES_MAP,
resolve_athena_modified_type,
resolve_trino_modified_type,
)
from tests.test_helpers import mce_helpers
Expand Down Expand Up @@ -306,6 +308,36 @@ def test_resolve_trino_modified_type(data_type, expected_data_type):
)


@pytest.mark.parametrize(
"data_type, expected_data_type",
[
("boolean", "boolean"),
("tinyint", "tinyint"),
("smallint", "smallint"),
("int", "int"),
("integer", "integer"),
("bigint", "bigint"),
("float", "float"),
("double", "double"),
("decimal(10,0)", "decimal"),
("varchar(20)", "varchar"),
("char", "char"),
("binary", "binary"),
("date", "date"),
("timestamp", "timestamp"),
("timestamp(3)", "timestamp"),
("struct(x bigint, y double)", "struct"),
("array(struct(x bigint, y double))", "array"),
("map(varchar, varchar)", "map"),
],
)
def test_resolve_athena_modified_type(data_type, expected_data_type):
assert (
resolve_athena_modified_type(data_type)
== ATHENA_SQL_TYPES_MAP[expected_data_type]
)


@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_tests_only_assertions(pytestconfig, tmp_path, mock_time, **kwargs):
Expand Down

0 comments on commit 6adb496

Please sign in to comment.