diff --git a/src/preset_cli/cli/superset/sync/dbt/command.py b/src/preset_cli/cli/superset/sync/dbt/command.py index af227497..f0993503 100644 --- a/src/preset_cli/cli/superset/sync/dbt/command.py +++ b/src/preset_cli/cli/superset/sync/dbt/command.py @@ -6,7 +6,7 @@ import sys import warnings from pathlib import Path -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple import click import yaml @@ -335,6 +335,43 @@ def get_job( raise ValueError(f"Job {job_id} not available") +def process_sl_metrics( + dbt_client: DBTClient, + environment_id: int, + model_map: Dict[ModelKey, ModelSchema], +) -> Optional[List[MFMetricWithSQLSchema]]: + """ + Fetch metrics from the semantic layer and return the ones we can map to models. + """ + dialect = dbt_client.get_sl_dialect(environment_id) + mf_metric_schema = MFMetricWithSQLSchema() + sl_metrics: List[MFMetricWithSQLSchema] = [] + for metric in dbt_client.get_sl_metrics(environment_id): + sql = dbt_client.get_sl_metric_sql(metric["name"], environment_id) + if sql is None: + continue + + try: + model = get_model_from_sql(sql, dialect, model_map) + except MultipleModelsError: + continue + + sl_metrics.append( + mf_metric_schema.load( + { + "name": metric["name"], + "type": metric["type"], + "description": metric["description"], + "sql": sql, + "dialect": dialect.value, + "model": model["unique_id"], + }, + ), + ) + + return sl_metrics + + @click.command() @click.argument("token") @click.argument("job_id", type=click.INT, required=False, default=None) @@ -448,34 +485,8 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals models = apply_select(models, select, exclude) model_map = {ModelKey(model["schema"], model["name"]): model for model in models} - # original dbt <= 1.6 metrics og_metrics = dbt_client.get_og_metrics(job["id"]) - - # MetricFlow metrics - dialect = dbt_client.get_sl_dialect(job["environment_id"]) - mf_metric_schema = MFMetricWithSQLSchema() - sl_metrics: List[MFMetricWithSQLSchema] = [] - for metric in dbt_client.get_sl_metrics(job["environment_id"]): - sql = dbt_client.get_sl_metric_sql(metric["name"], job["environment_id"]) - if sql is not None: - try: - model = get_model_from_sql(sql, dialect, model_map) - except MultipleModelsError: - continue - - sl_metrics.append( - mf_metric_schema.load( - { - "name": metric["name"], - "type": metric["type"], - "description": metric["description"], - "sql": sql, - "dialect": dialect.value, - "model": model["unique_id"], - }, - ), - ) - + sl_metrics = process_sl_metrics(dbt_client, job["environment_id"], model_map) superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics) if exposures_only: