Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dbt-sync): Migrate legacy endpoints to fix virtual dataset creation #232

Merged
merged 5 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions src/preset_cli/api/clients/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import pandas as pd
import prison
import requests
import yaml
from bs4 import BeautifulSoup
from yarl import URL
Expand Down Expand Up @@ -247,7 +248,7 @@ def _run_query(
schema: Optional[str] = None,
limit: int = 1000,
) -> Dict[str, Any]:
url = self.baseurl / "superset/sql_json/"
url = self.baseurl / "api/v1/sqllab/execute/"
data = {
"client_id": shortid()[:10],
"database_id": database_id,
Expand All @@ -268,10 +269,19 @@ def _run_query(
}
self.session.headers.update(headers)

_logger.debug("POST %s\n%s", url, json.dumps(data, indent=4))
response = self.session.post(url, json=data)
validate_response(response)
try:
_logger.debug("POST %s\n%s", url, json.dumps(data, indent=4))
response = self.session.post(url, json=data)
response.raise_for_status()

# Legacy superset installations don't have the SQL API endpoint yet
except requests.exceptions.HTTPError as err:
if err.response.status_code == 404:
url = self.baseurl / "superset/sql_json/"
_logger.debug("POST %s\n%s", url, json.dumps(data, indent=4))
response = self.session.post(url, json=data)
Vitor-Avila marked this conversation as resolved.
Show resolved Hide resolved

validate_response(response)
payload = response.json()

return payload
Expand Down Expand Up @@ -473,6 +483,37 @@ def update_resource(

return resource

def get_resource_endpoint_info(self, resource_name: str, **kwargs: Any) -> Any:
"""
Get resource endpoint info (such as available columns) possibly filtered.
"""
query = prison.dumps({"keys": list(kwargs["keys"])} if "keys" in kwargs else {})

url = self.baseurl / "api/v1" / resource_name / "_info" % {"q": query}
_logger.debug("GET %s", url)
response = self.session.get(url)
validate_response(response)

endpoint_info = response.json()

return endpoint_info

def validate_key_in_resource_schema(
self, resource_name: str, key_name: str, **kwargs: Any
) -> Any:
"""
Validate if a key is present in a resource schema.
"""
schema_validation = {}

endpoint_info = self.get_resource_endpoint_info(resource_name, **kwargs)

for key in kwargs.get("keys", ["add_columns", "edit_columns"]):
schema_columns = [column["name"] for column in endpoint_info.get(key, [])]
schema_validation[key] = key_name in schema_columns

return schema_validation

def get_database(self, database_id: int) -> Any:
"""
Return a single database.
Expand Down Expand Up @@ -526,6 +567,16 @@ def create_dataset(self, **kwargs: Any) -> Any:
if "sql" not in kwargs:
return self.create_resource("dataset", **kwargs)

# Check if the dataset creation supports sql directly
not_legacy = self.validate_key_in_resource_schema(
"dataset",
"sql",
keys=["add_columns"],
)
not_legacy = not_legacy["add_columns"]
if not_legacy:
return self.create_resource("dataset", **kwargs)
Comment on lines 563 to +574
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing the validation first, and then change line 567 to if "sql" not in kwargs or not_legacy so that we would only have a single return, but then I realized most commonly the integration syncs physical datasets, and this change would always execute validate_key_in_resource_schema which is another API request. With that in mind, I decided to have a second block for this, so that we only call this other endpoint if needed (trying to not increase the dataset sync duration).


# run query to determine columns types
payload = self._run_query(
database_id=kwargs["database"],
Expand All @@ -539,7 +590,7 @@ def create_dataset(self, **kwargs: Any) -> Any:
for column in columns:
column["column_name"] = column["name"]
column["groupby"] = True
if column["is_dttm"]:
if column.get("is_dttm") or column.get("is_date"):
Vitor-Avila marked this conversation as resolved.
Show resolved Hide resolved
column["type_generic"] = 2
elif column["type"] is None:
column["type"] = "UNKNOWN"
Expand All @@ -564,7 +615,7 @@ def create_dataset(self, **kwargs: Any) -> Any:

payload = response.json()

return payload["data"]
return payload["data"] if "data" in payload else {"id": payload["table_id"]}
Vitor-Avila marked this conversation as resolved.
Show resolved Hide resolved
Vitor-Avila marked this conversation as resolved.
Show resolved Hide resolved

def update_dataset(
self,
Expand Down
15 changes: 13 additions & 2 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
profiles: Optional[str] = None,
exposures: Optional[str] = None,
import_db: bool = False,
disallow_edits: bool = True,
disallow_edits: bool = False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this was set to True here even tough the default state is False, so I updated only for consistency but it doesn't really change the result. It should be totally fine to revert it to True if we want to.

external_url_prefix: str = "",
exposures_only: bool = False,
preserve_columns: bool = False,
Expand All @@ -102,6 +102,17 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
profiles = os.path.expanduser("~/.dbt/profiles.yml")

file_path = Path(file)

if "MANAGER_URL" not in ctx.obj and disallow_edits:
warnings.warn(
(
"The managed externally feature was only introduced in Superset v1.5."
"Make sure you are running a compatible version."
),
category=UserWarning,
stacklevel=2,
)
Comment on lines +106 to +114
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about only including is_managed_externally to the update payloads in case --disallow-edits flag was used, but then I realized that if a user wanted to trigger a new sync without this flag to set it to False, that wouldn't work. So I only added this warning which in addition to the error faced for legacy instances should be pretty straightforward (the error would actually happens even if --disallow-edits isn't included).


if file_path.name == "manifest.json":
warnings.warn(
(
Expand Down Expand Up @@ -324,7 +335,7 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
exclude: Tuple[str, ...],
exposures: Optional[str] = None,
job_id: Optional[int] = None,
disallow_edits: bool = True,
disallow_edits: bool = False,
external_url_prefix: str = "",
exposures_only: bool = False,
preserve_columns: bool = False,
Expand Down
Loading
Loading