Skip to content

Commit

Permalink
Add info logs
Browse files Browse the repository at this point in the history
  • Loading branch information
maiarareinaldo committed Mar 7, 2024
1 parent 8f2a7d4 commit d78a502
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,21 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No
preprocessed_sql, schema_resolver=schema_resolver
)

if sql_result:
if not sql_result.column_lineage or len(sql_result.column_lineage) == 0:
logger.info(f"Returned sql result but no column lineage for {node.dbt_name}")
else:
logger.info(f"No sql result returned for {node.dbt_name}")

# Save the column lineage.
if self.config.include_column_lineage and sql_result:
# We only save the debug info here. We're report errors based on it later, after
# applying the configured node filters.
node.cll_debug_info = sql_result.debug_info
if not sql_result.column_lineage or len(sql_result.column_lineage) == 0:
logger.info(f"No column lineage returned for {node.dbt_name}")
else:
logger.info(f"Column lineage returned for {len(sql_result.column_lineage)} columns in {node.dbt_name}")

if sql_result.column_lineage:
node.upstream_cll = [
Expand Down
19 changes: 13 additions & 6 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,6 @@ def _schema_aware_fuzzy_column_resolve(
upstreams=[],
)
)

return column_lineage

# Try to figure out the types of the output columns.
Expand Down Expand Up @@ -756,7 +755,10 @@ def _schema_aware_fuzzy_column_resolve(
}

if not direct_resolved_col_upstreams:
logger.debug(f' "{output_col}" has no upstreams')
logger.info(f' "{output_col}" has no upstreams')
else:
for upstream in direct_resolved_col_upstreams:
logger.info(f' "{output_col}" <- "{upstream.table}.{upstream.column}"')
column_lineage.append(
_ColumnLineageInfo(
downstream=_DownstreamColumnRef(
Expand All @@ -775,6 +777,7 @@ def _schema_aware_fuzzy_column_resolve(
f"sqlglot failed to compute some lineage: {e}"
) from e

logger.info(f"Column lineage length: {len(column_lineage)}")
return column_lineage


Expand Down Expand Up @@ -1019,6 +1022,10 @@ def _sqlglot_lineage_inner(
# Generate table-level lineage.
tables, modified = _table_level_lineage(statement, dialect=dialect)

logger.info("Completed table level lineage generation")
if len(tables) == 0:
logger.info("No tables discovered in the sql statement")

# Prep for generating column-level lineage.
downstream_table: Optional[_TableName] = None
if len(modified) == 1:
Expand Down Expand Up @@ -1053,15 +1060,15 @@ def _sqlglot_lineage_inner(
tables_discovered=total_tables_discovered,
table_schemas_resolved=total_schemas_resolved,
)
logger.debug(
logger.info(
f"Resolved {len(table_name_schema_mapping)} of {len(tables)} table schemas"
)

# Simplify the input statement for column-level lineage generation.
try:
select_statement = _try_extract_select(statement)
except Exception as e:
logger.debug(f"Failed to extract select from statement: {e}", exc_info=True)
logger.info(f"Failed to extract select from statement: {e}", exc_info=True)
debug_info.column_error = e
select_statement = None

Expand All @@ -1081,9 +1088,9 @@ def _sqlglot_lineage_inner(
# Inject details about the outer statement type too.
e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",)
debug_info.column_error = e
logger.debug(debug_info.column_error)
logger.info(debug_info.column_error)
except SqlUnderstandingError as e:
logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True)
logger.info(f"Failed to generate column-level lineage: {e}", exc_info=True)
debug_info.column_error = e

# TODO: Can we generate a common JOIN tables / keys section?
Expand Down

0 comments on commit d78a502

Please sign in to comment.