Skip to content

Commit

Permalink
Allow ignoring of certain upstream lineage platforms (#15)
Browse files Browse the repository at this point in the history
* Allow ignoring of certain upstream lineage platforms

* Handle whitespace

* Remove lineage_platform_instance
  • Loading branch information
luos-fc authored and maiarareinaldo committed Feb 19, 2024
1 parent 3271612 commit 590a3b8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
22 changes: 18 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ class TableauConfig(
description="[Experimental] Whether to extract lineage from unsupported custom sql queries using SQL parsing",
)

lineage_platform_instance: str = Field(
default="uk",
description="Platform instance for mapping upstream lineage URNs.",
ignore_upstream_lineage_platforms: Optional[str] = Field(
default="",
description="Comma separated list of platforms to not ingest upstream lineage for",
)

# pre = True because we want to take some decision before pydantic initialize the configuration to default values
Expand Down Expand Up @@ -499,6 +499,12 @@ def __init__(
# when emitting custom SQL data sources.
self.custom_sql_ids_being_used: List[str] = []

if self.config.ignore_upstream_lineage_platforms:
self.ignore_upstream_lineage_platforms = [
x.strip()
for x in (self.config.ignore_upstream_lineage_platforms.split(","))
]

self._authenticate()

def close(self) -> None:
Expand Down Expand Up @@ -975,6 +981,15 @@ def get_upstream_tables(
# Same table urn can be used when setting fine grained lineage,
table_id_to_urn: Dict[str, str] = {}
for table in tables:
if (
table.get(tableau_constant.CONNECTION_TYPE, "")
in self.ignore_upstream_lineage_platforms
):
logger.debug(
f"Skipping upstream table {table[tableau_constant.ID]}, ignoring upstream platform {table.get(tableau_constant.CONNECTION_TYPE, '')}"
)
continue

# skip upstream tables when there is no column info when retrieving datasource
# Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources()
num_tbl_cols: Optional[int] = table.get(c.COLUMNS_CONNECTION) and table[
Expand Down Expand Up @@ -1004,7 +1019,6 @@ def get_upstream_tables(
self.config.env,
self.config.platform_instance_map,
self.config.lineage_overrides,
self.config.lineage_platform_instance
)
table_id_to_urn[table[c.ID]] = table_urn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ def get_fully_qualified_table_name(
upstream_db: str,
schema: str,
table_name: str,
lineage_platform_instance: Optional[str] = None,
) -> str:
if platform == "athena":
upstream_db = ""
Expand Down Expand Up @@ -576,9 +575,6 @@ def get_fully_qualified_table_name(
fully_qualified_table_name.split(".")[-3:]
)

if lineage_platform_instance:
fully_qualified_table_name = f"{lineage_platform_instance}.{fully_qualified_table_name}"

return fully_qualified_table_name


Expand Down

0 comments on commit 590a3b8

Please sign in to comment.