-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix (dbt): derived metrics #154
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,10 @@ | |
from preset_cli.api.clients.dbt import MetricSchema, ModelSchema | ||
from preset_cli.api.clients.superset import SupersetClient | ||
from preset_cli.api.operators import OneToMany | ||
from preset_cli.cli.superset.sync.dbt.metrics import get_metric_expression | ||
from preset_cli.cli.superset.sync.dbt.metrics import ( | ||
get_metric_expression, | ||
get_metrics_for_model, | ||
) | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
@@ -109,9 +112,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma | |
|
||
dataset_metrics = [] | ||
model_metrics = { | ||
metric["name"]: metric | ||
for metric in metrics | ||
if model["unique_id"] in metric["depends_on"] | ||
Comment on lines
-112
to
-114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was too naive, and didn't account for how derived metrics depend on other metrics, instead of depending on models. I replaced it with a smarter function |
||
metric["name"]: metric for metric in get_metrics_for_model(model, metrics) | ||
} | ||
for name, metric in model_metrics.items(): | ||
meta = metric.get("meta", {}) | ||
|
@@ -120,7 +121,10 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma | |
{ | ||
"expression": get_metric_expression(name, model_metrics), | ||
"metric_name": name, | ||
"metric_type": metric["type"], | ||
"metric_type": ( | ||
metric.get("type") # dbt < 1.3 | ||
or metric.get("calculation_method") # dbt >= 1.3 | ||
), | ||
"verbose_name": metric.get("label", name), | ||
"description": metric.get("description", ""), | ||
"extra": json.dumps(meta), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,15 @@ | |
|
||
# pylint: disable=consider-using-f-string | ||
|
||
import logging | ||
from functools import partial | ||
from typing import Dict, List | ||
|
||
from jinja2 import Template | ||
|
||
from preset_cli.api.clients.dbt import FilterSchema, MetricSchema | ||
from preset_cli.api.clients.dbt import FilterSchema, MetricSchema, ModelSchema | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_metric_expression(metric_name: str, metrics: Dict[str, MetricSchema]) -> str: | ||
|
@@ -22,8 +25,16 @@ def get_metric_expression(metric_name: str, metrics: Dict[str, MetricSchema]) -> | |
raise Exception(f"Invalid metric {metric_name}") | ||
|
||
metric = metrics[metric_name] | ||
type_ = metric["type"] | ||
sql = metric["sql"] | ||
if "calculation_method" in metric: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: have it inline
|
||
# dbt >= 1.3 | ||
type_ = metric["calculation_method"] | ||
sql = metric["expression"] | ||
expression = "derived" | ||
else: | ||
# dbt < 1.3 | ||
type_ = metric["type"] | ||
sql = metric["sql"] | ||
expression = "expression" | ||
Comment on lines
+28
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
|
||
if metric.get("filters"): | ||
sql = apply_filters(sql, metric["filters"]) | ||
|
@@ -43,7 +54,7 @@ def get_metric_expression(metric_name: str, metrics: Dict[str, MetricSchema]) -> | |
if type_ == "count_distinct": | ||
return f"COUNT(DISTINCT {sql})" | ||
|
||
if type_ == "expression": | ||
if type_ == expression: | ||
template = Template(sql) | ||
return template.render(metric=partial(get_metric_expression, metrics=metrics)) | ||
|
||
|
@@ -59,3 +70,47 @@ def apply_filters(sql: str, filters: List[FilterSchema]) -> str: | |
"{field} {operator} {value}".format(**filter_) for filter_ in filters | ||
) | ||
return f"CASE WHEN {condition} THEN {sql} END" | ||
|
||
|
||
def is_derived(metric: MetricSchema) -> bool: | ||
""" | ||
Return if the metric is derived. | ||
""" | ||
return ( | ||
metric.get("calculation_method") == "derived" # dbt >= 1.3 | ||
or metric.get("type") == "expression" # dbt < 1.3 | ||
) | ||
|
||
|
||
def get_metrics_for_model( | ||
model: ModelSchema, | ||
metrics: List[MetricSchema], | ||
) -> List[MetricSchema]: | ||
""" | ||
Given a list of metrics, return those that are based on a given model. | ||
""" | ||
metric_map = {metric["unique_id"]: metric for metric in metrics} | ||
related_metrics = [] | ||
|
||
for metric in metrics: | ||
parents = set() | ||
queue = [metric] | ||
while queue: | ||
node = queue.pop() | ||
depends_on = node["depends_on"] | ||
if is_derived(node): | ||
queue.extend(metric_map[parent] for parent in depends_on) | ||
Comment on lines
+101
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the crux of the PR: for derived metrics we need to look at the upstream metrics to find the parent models. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the metric list something from superset or something from the dbt manifest? |
||
else: | ||
parents.update(depends_on) | ||
|
||
if len(parents) > 1: | ||
_logger.warning( | ||
"Metric %s cannot be calculated because it depends on multiple models", | ||
metric["name"], | ||
) | ||
break | ||
|
||
if model["unique_id"] == parents.pop(): | ||
related_metrics.append(metric) | ||
|
||
return related_metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a bug, but since the metric unique ID was not needed anywhere it was not caught. With the changes in this PR we use the metric ID to traverse the DAG upstream to find parent models.