-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(ingest/unity): Use databricks_sdk for all requests #8237
refactor(ingest/unity): Use databricks_sdk for all requests #8237
Conversation
"Use if you only want to ingest one metastore and " | ||
"do not want to grant your ingestion service account the admin role." | ||
), | ||
_only_ingest_assigned_metastore_removed = pydantic_removed_field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually the only mode supported, because workspace_client
queries via the workspace, and there is only one metastore per workspace
Haven't 100% been able to test this because I don't have the AWS perms to create a second metastore and workspace :| but based on the databricks api docs, it doesn't seem like you can get catalogs from multiple metastores like we were trying to do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have confirmed this now. I think for us to support multiple metastores, we'll want to have the config take in a map from workspace url -> token
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, this is what I remember as well.
@@ -39,7 +39,6 @@ def get_workunits( | |||
for future in as_completed(futures): | |||
wu: Optional[MetadataWorkUnit] = future.result() | |||
if wu: | |||
self.report.num_profile_workunits_emitted += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this as it's handed by reporting workunits
if response.get("tables") is None: | ||
logger.info( | ||
f"Tables not found for schema {schema.catalog.name}.{schema.name}" | ||
with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I went too far with this. This was my solution to the issue of (a) wanting to use the workspace client method and (b) still wanting to keep the generation
field, in case we want it. That field is not documented anywhere but it seems potentially useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth to ask Serge about this
total_results = response["totalResults"] | ||
for principal in response["Resources"]: | ||
yield self._create_service_principal(principal) | ||
for principal in self._workspace_client.service_principals.list(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe from our call, pagination is just not supported for this endpoint (even though it looks like it is). Ideally, I'd pull the list of service principals from ingestion, then call this endpoint to figure out what they are, but that would require a big code refactor and I'm not sure how we'd generate workunits in real time, rather than all at the end, if we did this. So for now, I think it's fine to just get them all at once
@@ -64,12 +70,12 @@ | |||
class CommonProperty: | |||
id: str | |||
name: str | |||
type: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought this got pretty confusing with all the different type
values floating around, and it was barely used
@@ -115,7 +122,7 @@ class ServicePrincipal: | |||
|
|||
@dataclass(frozen=True, order=True) | |||
class TableReference: | |||
metastore_id: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought this was confusing too, because metastore
(name) and metastore_id
are different things
c6eed66
to
f8e735d
Compare
Note to self: remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just left a few small comment but overall looks good
@@ -22,7 +22,7 @@ class DatasetContainerSubTypes(str, Enum): | |||
DATABASE = "Database" | |||
SCHEMA = "Schema" | |||
# System-Specific SubTypes | |||
PRESTO_CATALOG = "Catalog" | |||
CATALOG = "Catalog" # Presto or Unity Catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
"Use if you only want to ingest one metastore and " | ||
"do not want to grant your ingestion service account the admin role." | ||
), | ||
_only_ingest_assigned_metastore_removed = pydantic_removed_field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, this is what I remember as well.
if response.get("tables") is None: | ||
logger.info( | ||
f"Tables not found for schema {schema.catalog.name}.{schema.name}" | ||
with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth to ask Serge about this
self._escape_sequence(obj["name"]), | ||
), | ||
name=obj.name, | ||
id="{}.{}".format(metastore.id, self._escape_sequence(obj.name)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id="{}.{}".format(metastore.id, self._escape_sequence(obj.name)), | |
id=f"{metastore_id}.{self._escape_sequence(obj.name)}", |
def _create_table(self, schema: Schema, obj: Any) -> Table: | ||
table_id: str = "{}.{}".format(schema.id, self._escape_sequence(obj["name"])) | ||
def _create_table(self, schema: Schema, obj: TableInfoWithGeneration) -> Table: | ||
table_id: str = "{}.{}".format(schema.id, self._escape_sequence(obj.name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table_id: str = "{}.{}".format(schema.id, self._escape_sequence(obj.name)) | |
table_id: str = f"{schema}.{self._escape_sequence(obj.name)}" |
properties=obj.properties or {}, | ||
owner=obj.owner, | ||
generation=obj.generation, | ||
created_at=datetime.utcfromtimestamp(obj.created_at / 1000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created_at=datetime.utcfromtimestamp(obj.created_at / 1000), | |
created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc), |
generation=obj.generation, | ||
created_at=datetime.utcfromtimestamp(obj.created_at / 1000), | ||
created_by=obj.created_by, | ||
updated_at=datetime.utcfromtimestamp(obj.updated_at / 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated_at=datetime.utcfromtimestamp(obj.updated_at / 1000) | |
updated_at=datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) |
display_name=display_name, | ||
application_id=obj["applicationId"], | ||
active=obj.get("active"), | ||
id="{}.{}".format(obj.id, self._escape_sequence(obj.display_name)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id="{}.{}".format(obj.id, self._escape_sequence(obj.display_name)), | |
id=f"{obj.id}.{obj.display_name}", |
Closing as changed merged by #8238 |
Checklist