Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ignoring of certain upstream lineage platforms #15

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,9 @@ class TableauConfig(
description="[Experimental] Whether to extract lineage from unsupported custom sql queries using SQL parsing",
)

lineage_platform_instance: str = Field(
default="uk",
description="Platform prefix for mapping upstream lineage URNs.",
ignore_upstream_lineage_platforms: Optional[str] = Field(
default="",
description="Comma separated list of platforms to not ingest upstream lineage for",
)

# pre = True because we want to take some decision before pydantic initialize the configuration to default values
Expand Down Expand Up @@ -402,6 +402,7 @@ class TableauSource(StatefulIngestionSourceBase):
tableau_project_registry: Dict[str, TableauProject] = {}
workbook_project_map: Dict[str, str] = {}
datasource_project_map: Dict[str, str] = {}
ignore_upstream_lineage_platforms: List[str] = []

def __hash__(self):
return id(self)
Expand Down Expand Up @@ -438,6 +439,12 @@ def __init__(
# when emitting custom SQL data sources.
self.custom_sql_ids_being_used: List[str] = []

if self.config.ignore_upstream_lineage_platforms:
self.ignore_upstream_lineage_platforms = [
x.strip()
for x in (self.config.ignore_upstream_lineage_platforms.split(","))
]

self._authenticate()

def close(self) -> None:
Expand Down Expand Up @@ -917,6 +924,15 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq
# Same table urn can be used when setting fine grained lineage,
table_id_to_urn: Dict[str, str] = {}
for table in tables:
if (
table.get(tableau_constant.CONNECTION_TYPE, "")
in self.ignore_upstream_lineage_platforms
):
logger.debug(
f"Skipping upstream table {table[tableau_constant.ID]}, ignoring upstream platform {table.get(tableau_constant.CONNECTION_TYPE, '')}"
)
continue

# skip upstream tables when there is no column info when retrieving datasource
# Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources()
if not is_custom_sql and not table.get(tableau_constant.COLUMNS):
Expand Down Expand Up @@ -966,7 +982,6 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
self.config.lineage_platform_instance
)
table_id_to_urn[table[tableau_constant.ID]] = table_urn

Expand Down Expand Up @@ -1379,8 +1394,8 @@ def _create_lineage_from_unsupported_csql(
query = clean_query(csql.get(tableau_constant.QUERY))

# suppress sqlfluff logging because it is very spammy and lags the Airflow UI
logging.getLogger('sqlfluff.parser').setLevel(logging.WARNING)
logging.getLogger('sqlfluff.linter').setLevel(logging.WARNING)
logging.getLogger("sqlfluff.parser").setLevel(logging.WARNING)
logging.getLogger("sqlfluff.linter").setLevel(logging.WARNING)

parser = LineageRunner(query)
try:
Expand All @@ -1397,7 +1412,6 @@ def _create_lineage_from_unsupported_csql(
full_name=split_table[1],
platform_instance_map=self.config.platform_instance_map,
lineage_overrides=self.config.lineage_overrides,
lineage_platform_instance=self.config.lineage_platform_instance
)
upstream_tables.append(
UpstreamClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ def get_fully_qualified_table_name(
upstream_db: str,
schema: str,
full_name: str,
lineage_platform_instance: Optional[str] = None,
) -> str:
if platform == "athena":
upstream_db = ""
Expand Down Expand Up @@ -560,9 +559,6 @@ def get_fully_qualified_table_name(
fully_qualified_table_name.split(".")[-3:]
)

if lineage_platform_instance:
fully_qualified_table_name = f"{lineage_platform_instance}.{fully_qualified_table_name}"

return fully_qualified_table_name


Expand All @@ -583,7 +579,6 @@ def make_table_urn(
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
lineage_platform_instance: Optional[str] = None,
) -> str:
original_platform = platform = get_platform(connection_type)
if (
Expand All @@ -602,7 +597,7 @@ def make_table_urn(
upstream_db = lineage_overrides.database_override_map[upstream_db]

table_name = get_fully_qualified_table_name(
original_platform, upstream_db, schema, full_name, lineage_platform_instance
original_platform, upstream_db, schema, full_name
)
platform_instance = get_platform_instance(original_platform, platform_instance_map)
return builder.make_dataset_urn_with_platform_instance(
Expand Down
Loading