diff --git a/viadot/flows/transform_and_catalog.py b/viadot/flows/transform_and_catalog.py index 08ac6b895..6c739978e 100644 --- a/viadot/flows/transform_and_catalog.py +++ b/viadot/flows/transform_and_catalog.py @@ -191,8 +191,8 @@ def gen_flow(self) -> Flow: run_select = self.dbt_selects.get("run") run_select_safe = f"-s {run_select}" if run_select is not None else "" - run = ShellTask( - name="dbt_task_run", + run_model = ShellTask( + name="dbt_task_run_model", command=f"dbt run {run_select_safe} {dbt_target_option}", helper_script=f"cd {local_dbt_repo_path}", return_all=True, @@ -202,24 +202,14 @@ def gen_flow(self) -> Flow: test_select = self.dbt_selects.get("test", run_select) test_select_safe = f"-s {test_select}" if test_select is not None else "" - test = ShellTask( - name="dbt_task_test", + run_tests = ShellTask( + name="dbt_task_run_tests", command=f"dbt test {test_select_safe} {dbt_target_option}", helper_script=f"cd {local_dbt_repo_path}", return_all=True, stream_output=True, ).bind(flow=self) - # Generate docs - # Produces `catalog.json`, `run-results.json`, and `manifest.json` - - generate_catalog_json = custom_shell_task.bind( - name="dbt_task_docs_generate", - command=f"dbt docs generate {dbt_target_option} --no-compile", - helper_script=f"cd {self.dbt_project_path}", - flow=self, - ) - # Upload build metadata to Luma path_expanded = os.path.expandvars(self.metadata_dir_path) metadata_dir_path = Path(path_expanded) @@ -237,8 +227,7 @@ def gen_flow(self) -> Flow: dbt_clean_up.set_upstream(clone, flow=self) pull_dbt_deps.set_upstream(dbt_clean_up, flow=self) - run.set_upstream(pull_dbt_deps, flow=self) - test.set_upstream(run, flow=self) - generate_catalog_json.set_upstream(test, flow=self) - upload_metadata_luma.set_upstream(generate_catalog_json, flow=self) + run_model.set_upstream(pull_dbt_deps, flow=self) + run_tests.set_upstream(run_model, flow=self) + upload_metadata_luma.set_upstream(run_tests, flow=self) _cleanup_repo.set_upstream(upload_metadata_luma, flow=self) diff --git a/viadot/tasks/luma.py b/viadot/tasks/luma.py index 11eb91e45..bce06ef2d 100644 --- a/viadot/tasks/luma.py +++ b/viadot/tasks/luma.py @@ -12,7 +12,7 @@ def __init__( self, metadata_dir_path: str, url: str = None, - dbt_project_path: str = None, + dbt_project_path: str = "tmp_dbt_repo_dir", credentials_secret: str = None, vault_name: str = None, *args, @@ -26,6 +26,8 @@ def __init__( In the case of dbt, it's dbt project's `target` directory, which contains dbt artifacts (`sources.json`, `catalog.json`, `manifest.json`, and `run_results.json`). url (str, optional): The url of the Luma ingestion API. Defaults to None. + dbt_project_path (str, optional): The path to the dbt project (the directory containing + the `dbt_project.yml` file). Defaults to 'tmp_dbt_repo_dir'. credentials_secret (str, optional): The name of the Azure Key Vault secret containing Luma credentials. Defaults to None. vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. @@ -38,10 +40,10 @@ def __init__( secret=credentials_secret, vault_name=vault_name ) url = json.loads(credentials_str).get("url") - self.helper_script = dbt_project_path + self.helper_script = f"cd {dbt_project_path}" self.url = url self.metadata_dir_path = metadata_dir_path - self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path}" + self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path} --no-config" self.return_all = True self.stream_output = True self.log_stderr = True