Skip to content

Commit

Permalink
Support for the experimental Databricks CLI launcher (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Nov 7, 2023
1 parent d16a6ee commit ac7b8bb
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,5 @@ dev/cleanup.py
.databricks
.vscode

.python-version
.python-version
.databricks-login.json
19 changes: 19 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
name: ucx
description: Unity Catalog Migration Toolkit (UCX)
install:
warehouse_types:
- PRO
script: src/databricks/labs/ucx/install.py
entrypoint: src/databricks/labs/ucx/cli.py
min_python: 3.10
commands:
- name: open-remote-config
description: Opens remote configuration in the browser

- name: workflows
description: Show deployed workflows and their state
table_template: |-
Step\tState\tStarted
{{range .}}{{.step}}\t{{.state}}\t{{.started}}
{{end}}
51 changes: 51 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import logging
import sys
import webbrowser

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.install import WorkspaceInstaller

logger = logging.getLogger("databricks.labs.ucx")


def workflows():
ws = WorkspaceClient()
installer = WorkspaceInstaller(ws)
logger.info("Fetching deployed jobs...")
print(json.dumps(installer.latest_job_status()))


def open_remote_config():
ws = WorkspaceClient()
installer = WorkspaceInstaller(ws)

ws_file_url = installer.notebook_link(installer.config_file)
webbrowser.open(ws_file_url)


MAPPING = {
"open-remote-config": open_remote_config,
"workflows": workflows,
}


def main(raw):
payload = json.loads(raw)
command = payload["command"]
if command not in MAPPING:
msg = f"cannot find command: {command}"
raise KeyError(msg)
flags = payload["flags"]
log_level = flags.pop("log_level")
if log_level != "disabled":
databricks_logger = logging.getLogger("databricks")
databricks_logger.setLevel(log_level.upper())

kwargs = {k.replace("-", "_"): v for k, v in flags.items()}
MAPPING[command](**kwargs)


if __name__ == "__main__":
main(*sys.argv[1:])
53 changes: 32 additions & 21 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _run_configured(self):
self._install_spark_config_for_hms_lineage()
self._create_dashboards()
self._create_jobs()
readme = f'{self._notebook_link(f"{self._install_folder}/README.py")}'
readme = f'{self.notebook_link(f"{self._install_folder}/README.py")}'
msg = f"Installation completed successfully! Please refer to the {readme} notebook for next steps."
logger.info(msg)

Expand Down Expand Up @@ -249,14 +249,14 @@ def _install_folder(self):
return f"/Users/{self._my_username}/.{self._prefix}"

@property
def _config_file(self):
def config_file(self):
return f"{self._install_folder}/config.yml"

@property
def _current_config(self):
if hasattr(self, "_config"):
return self._config
with self._ws.workspace.download(self._config_file) as f:
with self._ws.workspace.download(self.config_file) as f:
self._config = WorkspaceConfig.from_bytes(f.read())
return self._config

Expand All @@ -279,12 +279,10 @@ def _configure_inventory_database(self):
return inventory_database

def _configure(self):
ws_file_url = self._notebook_link(self._config_file)
ws_file_url = self.notebook_link(self.config_file)
try:
self._ws.workspace.get_status(self._config_file)
self._ws.workspace.get_status(self.config_file)
logger.info(f"UCX is already configured. See {ws_file_url}")
if self._prompts and self._question("Open config file in the browser", default="yes") == "yes":
webbrowser.open(ws_file_url)
return
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
Expand Down Expand Up @@ -377,13 +375,13 @@ def _write_config(self):
self._ws.workspace.mkdirs(self._install_folder)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
logger.info(f"Creating configuration file: {self._config_file}")
self._ws.workspace.upload(self._config_file, config_bytes, format=ImportFormat.AUTO)
logger.info(f"Creating configuration file: {self.config_file}")
self._ws.workspace.upload(self.config_file, config_bytes, format=ImportFormat.AUTO)

def _create_jobs(self):
logger.debug(f"Creating jobs from tasks in {main.__name__}")
remote_wheel = self._upload_wheel()
self._deployed_steps = self._deployed_steps()
self._deployed_steps = self.deployed_steps()
desired_steps = {t.workflow for t in _TASKS.values()}
wheel_runner = None

Expand Down Expand Up @@ -424,7 +422,7 @@ def _step_list(cls) -> list[str]:
def _create_readme(self):
md = [
"# UCX - The Unity Catalog Migration Assistant",
f'To troubleshoot, see [debug notebook]({self._notebook_link(f"{self._install_folder}/DEBUG.py")}).\n',
f'To troubleshoot, see [debug notebook]({self.notebook_link(f"{self._install_folder}/DEBUG.py")}).\n',
"Here are the URLs and descriptions of workflows that trigger various stages of migration.",
"All jobs are defined with necessary cluster configurations and DBR versions.\n",
]
Expand Down Expand Up @@ -457,7 +455,7 @@ def _create_readme(self):
intro = "\n".join(preamble + [f"# MAGIC {line}" for line in md])
path = f"{self._install_folder}/README.py"
self._ws.workspace.upload(path, intro.encode("utf8"), overwrite=True)
url = self._notebook_link(path)
url = self.notebook_link(path)
logger.info(f"Created README notebook with job overview: {url}")
msg = "Open job overview in README notebook in your home directory ?"
if self._prompts and self._question(msg, default="yes") == "yes":
Expand All @@ -467,22 +465,22 @@ def _replace_inventory_variable(self, text: str) -> str:
return text.replace("$inventory", f"hive_metastore.{self._current_config.inventory_database}")

def _create_debug(self, remote_wheel: str):
readme_link = self._notebook_link(f"{self._install_folder}/README.py")
readme_link = self.notebook_link(f"{self._install_folder}/README.py")
job_links = ", ".join(
f"[{self._name(step_name)}]({self._ws.config.host}#job/{job_id})"
for step_name, job_id in self._deployed_steps.items()
)
path = f"{self._install_folder}/DEBUG.py"
logger.debug(f"Created debug notebook: {self._notebook_link(path)}")
logger.debug(f"Created debug notebook: {self.notebook_link(path)}")
self._ws.workspace.upload(
path,
DEBUG_NOTEBOOK.format(
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self.config_file
).encode("utf8"),
overwrite=True,
)

def _notebook_link(self, path: str) -> str:
def notebook_link(self, path: str) -> str:
return f"{self._ws.config.host}/#workspace{path}"

def _choice_from_dict(self, text: str, choices: dict[str, Any]) -> Any:
Expand Down Expand Up @@ -569,8 +567,8 @@ def _apply_cluster_overrides(settings: dict[str, any], overrides: dict[str, str]
def _upload_wheel_runner(self, remote_wheel: str):
# TODO: we have to be doing this workaround until ES-897453 is solved in the platform
path = f"{self._install_folder}/wheels/wheel-test-runner-{self._version}.py"
logger.debug(f"Created runner notebook: {self._notebook_link(path)}")
py = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheel, config_file=self._config_file).encode("utf8")
logger.debug(f"Created runner notebook: {self.notebook_link(path)}")
py = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheel, config_file=self.config_file).encode("utf8")
self._ws.workspace.upload(path, py, overwrite=True)
return path

Expand Down Expand Up @@ -624,7 +622,7 @@ def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
base_parameters={
"inventory_database": self._current_config.inventory_database,
"task": task.name,
"config": f"/Workspace{self._config_file}",
"config": f"/Workspace{self.config_file}",
}
| EXTRA_TASK_PARAMS,
),
Expand All @@ -637,7 +635,7 @@ def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, dbfs_path: str) -> j
python_wheel_task=jobs.PythonWheelTask(
package_name="databricks_labs_ucx",
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"} | EXTRA_TASK_PARAMS,
named_parameters={"task": task.name, "config": f"/Workspace{self.config_file}"} | EXTRA_TASK_PARAMS,
),
)

Expand Down Expand Up @@ -774,7 +772,7 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec:
)
return replace(spec, gcp_attributes=compute.GcpAttributes(availability=compute.GcpAvailability.ON_DEMAND_GCP))

def _deployed_steps(self):
def deployed_steps(self):
deployed_steps = {}
logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._app}")
for j in self._ws.jobs.list():
Expand Down Expand Up @@ -820,6 +818,19 @@ def _get_ext_hms_conf_from_policy(cluster_policy):
spark_conf_dict[key[11:]] = cluster_policy[key]["value"]
return instance_profile, spark_conf_dict

def latest_job_status(self) -> list[dict]:
latest_status = []
for step, job_id in self.deployed_steps().items():
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
latest_status.append(
{
"step": step,
"state": "UNKNOWN" if not job_runs else str(job_runs[0].state.result_state),
"started": "" if not job_runs else job_runs[0].start_time,
}
)
return latest_status


if __name__ == "__main__":
ws = WorkspaceClient(product="ucx", product_version=__version__)
Expand Down

0 comments on commit ac7b8bb

Please sign in to comment.