diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index a8b4f3177d162..04e494d9a6255 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -990,11 +990,21 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No preprocessed_sql, schema_resolver=schema_resolver ) + if sql_result: + if not sql_result.column_lineage or len(sql_result.column_lineage) == 0: + logger.info(f"Returned sql result but no column lineage for {node.dbt_name}") + else: + logger.info(f"No sql result returned for {node.dbt_name}") + # Save the column lineage. if self.config.include_column_lineage and sql_result: # We only save the debug info here. We're report errors based on it later, after # applying the configured node filters. node.cll_debug_info = sql_result.debug_info + if not sql_result.column_lineage or len(sql_result.column_lineage) == 0: + logger.info(f"No column lineage returned for {node.dbt_name}") + else: + logger.info(f"Column lineage returned for {len(sql_result.column_lineage)} columns in {node.dbt_name}") if sql_result.column_lineage: node.upstream_cll = [ diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index 5837a2be613a2..396c60fc873bc 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -660,7 +660,6 @@ def _schema_aware_fuzzy_column_resolve( upstreams=[], ) ) - return column_lineage # Try to figure out the types of the output columns. @@ -756,7 +755,10 @@ def _schema_aware_fuzzy_column_resolve( } if not direct_resolved_col_upstreams: - logger.debug(f' "{output_col}" has no upstreams') + logger.info(f' "{output_col}" has no upstreams') + else: + for upstream in direct_resolved_col_upstreams: + logger.info(f' "{output_col}" <- "{upstream.table}.{upstream.column}"') column_lineage.append( _ColumnLineageInfo( downstream=_DownstreamColumnRef( @@ -775,6 +777,7 @@ def _schema_aware_fuzzy_column_resolve( f"sqlglot failed to compute some lineage: {e}" ) from e + logger.info(f"Column lineage length: {len(column_lineage)}") return column_lineage @@ -1019,6 +1022,10 @@ def _sqlglot_lineage_inner( # Generate table-level lineage. tables, modified = _table_level_lineage(statement, dialect=dialect) + logger.info("Completed table level lineage generation") + if len(tables) == 0: + logger.info("No tables discovered in the sql statement") + # Prep for generating column-level lineage. downstream_table: Optional[_TableName] = None if len(modified) == 1: @@ -1053,7 +1060,7 @@ def _sqlglot_lineage_inner( tables_discovered=total_tables_discovered, table_schemas_resolved=total_schemas_resolved, ) - logger.debug( + logger.info( f"Resolved {len(table_name_schema_mapping)} of {len(tables)} table schemas" ) @@ -1061,7 +1068,7 @@ def _sqlglot_lineage_inner( try: select_statement = _try_extract_select(statement) except Exception as e: - logger.debug(f"Failed to extract select from statement: {e}", exc_info=True) + logger.info(f"Failed to extract select from statement: {e}", exc_info=True) debug_info.column_error = e select_statement = None @@ -1081,9 +1088,9 @@ def _sqlglot_lineage_inner( # Inject details about the outer statement type too. e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",) debug_info.column_error = e - logger.debug(debug_info.column_error) + logger.info(debug_info.column_error) except SqlUnderstandingError as e: - logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True) + logger.info(f"Failed to generate column-level lineage: {e}", exc_info=True) debug_info.column_error = e # TODO: Can we generate a common JOIN tables / keys section?