Skip to content

Commit

Permalink
Merge pull request #2 from Brigad/main
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
shrodingers authored Jun 23, 2023
2 parents 4491a0d + a00c395 commit f1fddc5
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 55 deletions.
19 changes: 17 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --no-annotate
#
-e file:.
aiohttp==3.8.3
aiosignal==1.2.0
alembic==1.11.1
appdirs==1.4.4
async-timeout==4.0.2
attrs==22.1.0
Expand All @@ -17,19 +18,30 @@ charset-normalizer==2.0.12
click==8.1.2
commonmark==0.9.1
cython==0.29.28
databricks-sql-connector==2.6.1
et-xmlfile==1.1.0
frozenlist==1.3.1
future==0.18.3
greenlet==1.1.3.post0
idna==3.3
importlib-metadata==6.6.0
importlib-resources==5.12.0
jinja2==3.1.1
lz4==4.3.2
mako==1.2.4
markupsafe==2.1.1
marshmallow==3.17.0
multidict==6.0.2
numpy==1.22.3
oauthlib==3.2.2
openpyxl==3.1.2
packaging==21.3
pandas==1.4.2
prison==0.2.1
prompt-toolkit==3.0.29
pyarrow==12.0.0
pygments==2.12.0
pyhive==0.6.5
pyparsing==3.0.9
python-dateutil==2.8.2
python-graphql-client==0.4.3
Expand All @@ -40,10 +52,13 @@ rich==12.3.0
six==1.16.0
soupsieve==2.3.2.post1
sqlalchemy==1.4.35
sqlalchemy-databricks==0.2.0
sqlparse==0.4.2
tabulate==0.8.9
thrift==0.16.0
typing-extensions==4.2.0
urllib3==1.26.9
wcwidth==0.2.5
websockets==10.3
yarl==1.7.2
zipp==3.15.0
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ install_requires =
requests>=2.26.0
rich>=12.3.0
sqlalchemy>=1.4
sqlalchemy-databricks>=0.2.0
sqlparse>=0.4.2
tabulate>=0.8.9
typing-extensions>=4.0.1
Expand Down
3 changes: 2 additions & 1 deletion src/preset_cli/api/clients/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class MetricSchema(PostelSchema, VersionnedSchema):
"""
Schema for a metric.
"""

depends_on = fields.List(fields.String(), data_key="dependsOn")
description = fields.String()
filters = fields.List(fields.Nested(FilterSchema))
Expand All @@ -563,6 +563,7 @@ class MetricSchema(PostelSchema, VersionnedSchema):
unique_id = fields.String(data_key="uniqueId")
expression = fields.String()
calculation_method = fields.String()
metrics = fields.List(fields.List(fields.String()))



Expand Down
82 changes: 62 additions & 20 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def create_dataset(
else:
engine = create_engine(url)
quote = engine.dialect.identifier_preparer.quote
source = ".".join(quote(model[key])
for key in ("database", "schema", "alias"))
source = ".".join(quote(model[key]) for key in ("database", "schema", "alias"))
kwargs = {
"database": database["id"],
"schema": model["schema"],
Expand Down Expand Up @@ -82,10 +81,16 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
for model in models:
filters = {
"database": OneToMany(database["id"]),
"schema": model["schema"],
"table_name": model.get("alias") or model["name"],
}
existing = client.get_datasets(**filters)
if len(existing) > 1:
unique_id = model["unique_id"]
existing = [
item
for item in existing
if unique_id == json.loads(item["extra"])["unique_id"]
]
if len(existing) > 1:
raise Exception("More than one dataset found")

Expand All @@ -110,21 +115,52 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma

dataset_info = client.get_dataset(dataset["id"])
existing_metrics = dataset_info["metrics"]
metric_keys = ['d3format',
'description',
'expression',
'extra',
'metric_name',
'metric_type',
'verbose_name',
'warning_text']
metric_keys = [
"d3format",
"description",
"expression",
"extra",
"metric_name",
"metric_type",
"verbose_name",
"warning_text",
]

def get_deps_metrics(metric):
metrics_len = len(metric["metrics"]) or 0
if metrics_len > 0:
result = [
get_deps_metrics(
[
metric_value
for metric_value in metrics
if metric_value["name"] == real_metric
][0]
)
for sub_metric in metric.get("metrics")
for real_metric in sub_metric
]
return metric.get("depends_on") + [
elem for all in result for elem in all
]
return metric.get("depends_on")

model_metrics = {
metric["name"]: metric
for metric in metrics
if model["unique_id"] in metric["depends_on"]
if model["unique_id"] in get_deps_metrics(metric)
}
model_metrics_names = [dbt_metric["name"] for dbt_metric in metrics]
dataset_metrics = [{key: value for key, value in metric.items() if key in metric_keys} for metric in existing_metrics if metric["metric_name"] != 'count' and metric["metric_name"] not in model_metrics_names] if existing_metrics else []
dataset_metrics = (
[
{key: value for key, value in metric.items() if key in metric_keys}
for metric in existing_metrics
if metric["metric_name"] != "count"
and metric["metric_name"] not in model_metrics_names
]
if existing_metrics
else []
)
for name, metric in model_metrics.items():
meta = metric.get("meta", {})
kwargs = meta.pop("superset", {})
Expand All @@ -133,7 +169,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
"expression": get_metric_expression(name, model_metrics),
"metric_name": name,
"metric_type": metric.get("type") # dbt < 1.3
or metric.get("calculation_method"), # dbt >= 1.3
or metric.get("calculation_method"), # dbt >= 1.3
"verbose_name": metric.get("label", name),
"description": metric.get("description", ""),
"extra": json.dumps(meta),
Expand All @@ -144,6 +180,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
# update dataset clearing metrics...
update = {
"description": model.get("description", ""),
"schema": model["schema"],
"extra": json.dumps(extra),
"is_managed_externally": disallow_edits,
"metrics": [],
Expand All @@ -159,20 +196,25 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
update = {
"metrics": dataset_metrics,
}
client.update_dataset(
dataset["id"], override_columns=False, **update)
client.update_dataset(dataset["id"], override_columns=False, **update)

# update column descriptions
update = {
"columns": [
{"column_name": name, "description": column.get("description", ""), "is_dttm": column["data_type"] == "timestamp" if not column.get(
"meta", {}).get("superset", {}).get("is_dttm", False) else False}
{
"column_name": name,
"description": column.get("description", ""),
"is_dttm": column["data_type"] == "timestamp"
if not column.get("meta", {})
.get("superset", {})
.get("is_dttm", False)
else False,
}
for name, column in model.get("columns", {}).items()
],
}
if update["columns"]:
client.update_dataset(
dataset["id"], override_columns=True, **update)
client.update_dataset(dataset["id"], override_columns=True, **update)

datasets.append(dataset)

Expand Down
19 changes: 12 additions & 7 deletions src/preset_cli/cli/superset/sync/dbt/exposures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def get_dashboard_depends_on(
Get all the dbt dependencies for a given dashboard.
"""

url = client.baseurl / "api/v1/dashboard" / str(dashboard["id"]) / "datasets"
url = client.baseurl / "api/v1/dashboard" / \
str(dashboard["id"]) / "datasets"

session = client.auth.session
headers = client.auth.get_headers()
Expand Down Expand Up @@ -96,7 +97,8 @@ def sync_exposures( # pylint: disable=too-many-locals
dashboards_ids = set()

for dataset in datasets:
url = client.baseurl / "api/v1/dataset" / str(dataset["id"]) / "related_objects"
url = client.baseurl / "api/v1/dataset" / \
str(dataset["id"]) / "related_objects"

session = client.auth.session
headers = client.auth.get_headers()
Expand All @@ -111,7 +113,8 @@ def sync_exposures( # pylint: disable=too-many-locals

for chart_id in charts_ids:
chart = client.get_chart(chart_id)
first_owner = chart["owners"][0]
first_owner = chart["owners"][0] if len(chart["owners"]) else {
"first_name": "Unknown", "last_name": "Owner"}
exposure = {
"name": chart["slice_name"] + " [chart]",
"type": "analysis",
Expand All @@ -122,7 +125,7 @@ def sync_exposures( # pylint: disable=too-many-locals
% {"form_data": json.dumps({"slice_id": chart_id})},
),
"description": chart["description"] or "",
"meta": { "id": chart_id },
"meta": {"id": chart_id},
"depends_on": get_chart_depends_on(client, chart, model_map),
"owner": {
"name": first_owner["first_name"] + " " + first_owner["last_name"],
Expand All @@ -133,7 +136,8 @@ def sync_exposures( # pylint: disable=too-many-locals

for dashboard_id in dashboards_ids:
dashboard = client.get_dashboard(dashboard_id)
first_owner = dashboard["owners"][0]
first_owner = dashboard["owners"][0] if len(dashboard["owners"]) else {
"first_name": "Unknown", "last_name": "Owner"}
exposure = {
"name": dashboard["dashboard_title"] + " [dashboard]",
"type": "dashboard",
Expand All @@ -142,7 +146,7 @@ def sync_exposures( # pylint: disable=too-many-locals
else "low",
"url": str(client.baseurl / dashboard["url"].lstrip("/")),
"description": "",
"meta": { "id": f"d_{dashboard_id}" },
"meta": {"id": f"d_{dashboard_id}"},
"depends_on": get_dashboard_depends_on(client, dashboard, model_map),
"owner": {
"name": first_owner["first_name"] + " " + first_owner["last_name"],
Expand All @@ -152,4 +156,5 @@ def sync_exposures( # pylint: disable=too-many-locals
exposures.append(exposure)

with open(exposures_path, "w", encoding="utf-8") as output:
yaml.safe_dump({"version": 2, "exposures": exposures}, output, sort_keys=False)
yaml.safe_dump({"version": 2, "exposures": exposures},
output, sort_keys=False)
Loading

0 comments on commit f1fddc5

Please sign in to comment.