From 590a3b8a87aec5b3cc2c73e6bac8b2aba54d3518 Mon Sep 17 00:00:00 2001 From: Louis Mackie <88324241+luos-fc@users.noreply.github.com> Date: Fri, 11 Aug 2023 16:39:28 +0100 Subject: [PATCH] Allow ignoring of certain upstream lineage platforms (#15) * Allow ignoring of certain upstream lineage platforms * Handle whitespace * Remove lineage_platform_instance --- .../src/datahub/ingestion/source/tableau.py | 22 +++++++++++++++---- .../ingestion/source/tableau_common.py | 4 ---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index c9db9e4e6d2b3..f1fd85f187408 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -353,9 +353,9 @@ class TableauConfig( description="[Experimental] Whether to extract lineage from unsupported custom sql queries using SQL parsing", ) - lineage_platform_instance: str = Field( - default="uk", - description="Platform instance for mapping upstream lineage URNs.", + ignore_upstream_lineage_platforms: Optional[str] = Field( + default="", + description="Comma separated list of platforms to not ingest upstream lineage for", ) # pre = True because we want to take some decision before pydantic initialize the configuration to default values @@ -499,6 +499,12 @@ def __init__( # when emitting custom SQL data sources. self.custom_sql_ids_being_used: List[str] = [] + if self.config.ignore_upstream_lineage_platforms: + self.ignore_upstream_lineage_platforms = [ + x.strip() + for x in (self.config.ignore_upstream_lineage_platforms.split(",")) + ] + self._authenticate() def close(self) -> None: @@ -975,6 +981,15 @@ def get_upstream_tables( # Same table urn can be used when setting fine grained lineage, table_id_to_urn: Dict[str, str] = {} for table in tables: + if ( + table.get(tableau_constant.CONNECTION_TYPE, "") + in self.ignore_upstream_lineage_platforms + ): + logger.debug( + f"Skipping upstream table {table[tableau_constant.ID]}, ignoring upstream platform {table.get(tableau_constant.CONNECTION_TYPE, '')}" + ) + continue + # skip upstream tables when there is no column info when retrieving datasource # Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources() num_tbl_cols: Optional[int] = table.get(c.COLUMNS_CONNECTION) and table[ @@ -1004,7 +1019,6 @@ def get_upstream_tables( self.config.env, self.config.platform_instance_map, self.config.lineage_overrides, - self.config.lineage_platform_instance ) table_id_to_urn[table[c.ID]] = table_urn diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 59720913a53f4..65d779b7f4516 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -543,7 +543,6 @@ def get_fully_qualified_table_name( upstream_db: str, schema: str, table_name: str, - lineage_platform_instance: Optional[str] = None, ) -> str: if platform == "athena": upstream_db = "" @@ -576,9 +575,6 @@ def get_fully_qualified_table_name( fully_qualified_table_name.split(".")[-3:] ) - if lineage_platform_instance: - fully_qualified_table_name = f"{lineage_platform_instance}.{fully_qualified_table_name}" - return fully_qualified_table_name