diff --git a/expts_manager/Expts_manager.py b/expts_manager/Expts_manager.py index 50333d2..718b478 100755 --- a/expts_manager/Expts_manager.py +++ b/expts_manager/Expts_manager.py @@ -191,15 +191,18 @@ def _clone_repo(branch_name, url, path, tool_name): # make_diag_table is [optional] self.diag_path = ( - os.path.join(self.dir_manager, self.diag_dir_name) + os.path.join(self.dir_manager, self.test_path, self.diag_dir_name) if self.diag_dir_name else None ) - _clone_repo( - self.diag_branch_name, self.diag_url, self.diag_path, self.diag_dir_name - ) - sys.path.extend([utils_path, self.diag_path]) + if self.diag_path is not None: + _clone_repo( + self.diag_branch_name, self.diag_url, self.diag_path, self.diag_dir_name + ) + sys.path.extend([utils_path, self.diag_path]) + else: + sys.path.extend([utils_path]) # load modules from om3-utils from om3utils import MOM6InputParser @@ -237,16 +240,6 @@ def manage_ctrl_expt(self): ) # extract specific configuration via commit hash self._extract_config_via_commit() - - # [optional] modify diag_table - if self.diag_ctrl and self.diag_path: - self._copy_diag_table(base_path) - - # setup the control experiments - self._setup_ctrl_expt() - - # Checks the current state of the repo, commits relevant changes. - self._check_and_commit_changes() else: # clone the template repo and setup the control branch self._clone_template_repo() @@ -254,15 +247,12 @@ def manage_ctrl_expt(self): # extract specific configuration via commit hash self._extract_config_via_commit() - # [optional] modify diag_table - if self.diag_ctrl and self.diag_path: - self._copy_diag_table(base_path) + # [optional] modify diag_table + if self.diag_ctrl and self.diag_path: + self._copy_diag_table(base_path) - # setup the control experiments - self._setup_ctrl_expt() - - # Checks the current state of the repo, commits relevant changes. - self._check_and_commit_changes() + # setup the control experiments + self._setup_ctrl_expt() # check exisiting pbs jobs pbs_jobs = self._output_existing_pbs_jobs() @@ -273,6 +263,10 @@ def manage_ctrl_expt(self): else: duplicated_bool = False + if not duplicated_bool: + # Checks the current state of the repo, commits relevant changes. + self._check_and_commit_changes() + # start control runs, count existing runs and do additional runs if needed self._start_experiment_runs( base_path, self.base_dir_name, duplicated_bool, ctrl_nruns @@ -317,14 +311,6 @@ def _count_file_nums(self): return len(os.listdir(self.base_path)) def _setup_ctrl_expt(self): - # Update configuration files, namelist and MOM_input for the control experiment if needed. - self._update_contrl_params() - - # Payu setup && sweep to ensure the changes correctly && remove the `work` directory for the control run - command = f"cd {self.base_path} && payu setup && payu sweep" - subprocess.run(command, shell=True, check=False) - - def _update_contrl_params(self): """ Modifies parameters based on the input YAML configuration for the ctrl experiment. @@ -369,11 +355,11 @@ def _update_contrl_params(self): elif file_name == "config.yaml": file_read = self._read_ryaml(tmp_file_path) yaml_data["jobname"] = self.base_dir_name + self._update_config_entries(file_read, yaml_data) if yaml_data["jobname"] != self.base_dir_name: raise ValueError( f"jobname must be the same as {self.base_dir_name}!" ) - self._update_config_entries(file_read, yaml_data) self._write_ryaml(file_read, tmp_file_path) # Update parameters from `MOM_input` @@ -657,17 +643,13 @@ def setup_expts(self, parameter_block): self._copy_diag_table(expt_path) # symlink restart directories - if self.startfrom_str != "rest": - self._generate_restart_symlink(expt_path) + restartpath = self._generate_restart_symlink(expt_path) # optionally update nuopc.runconfig for perturbation runs self._update_nuopc_config_perturb(expt_path) self._update_config_yaml_perturb(expt_path, expt_name) - self._update_metadata_yaml_perturb(expt_path, param_dict) - - # clean `work` directory for failed jobs - self._clean_workspace(expt_path) + self._update_metadata_yaml_perturb(expt_path, param_dict, restartpath) # check exisiting pbs jobs pbs_jobs = self._output_existing_pbs_jobs() @@ -803,6 +785,7 @@ def _generate_restart_symlink(self, expt_path): if os.path.exists(dest) or os.path.islink(dest): os.remove(dest) # remove symlink os.symlink(restartpath, dest) # generate symlink + return restartpath def _update_nuopc_config_perturb(self, path): """ @@ -828,7 +811,7 @@ def _update_config_yaml_perturb(self, expt_path, expt_name): config_data["jobname"] = expt_name self._write_ryaml(config_data, config_path) - def _update_metadata_yaml_perturb(self, expt_path, param_dict): + def _update_metadata_yaml_perturb(self, expt_path, param_dict, restartpath): """ Updates the `metadata.yaml` file with relevant metadata. @@ -838,34 +821,19 @@ def _update_metadata_yaml_perturb(self, expt_path, param_dict): """ metadata_path = os.path.join(expt_path, "metadata.yaml") metadata = self._read_ryaml(metadata_path) # load metadata of each perturbation - if self.startfrom_str == "rest": - restartpath = "rest" self._update_metadata_description(metadata, restartpath) # update `description` - self._remove_metadata_comments( - "description", metadata - ) # remove None comments from `description` - keywords = self._extract_metadata_keywords( - param_dict - ) # extract parameters from the change list - metadata["keywords"] = ( - f"{self.base_dir_name}, {self.branch_perturb}, {keywords}" # update `keywords` - ) - self._remove_metadata_comments( - "keywords", metadata - ) # remove None comments from `keywords` - self._write_ryaml(metadata, metadata_path) # write to file - def _clean_workspace(self, dir_path): - """ - Cleans `work` directory for failed jobs. - """ - work_dir = os.path.join(dir_path, "work") - # in case any failed job - if os.path.islink(work_dir) and os.path.isdir(work_dir): - # Payu sweep && setup to ensure the changes correctly && remove the `work` directory - command = f"payu sweep && payu setup" - subprocess.run(command, shell=True, check=False) - print(f"Clean up a failed job {work_dir} and prepare it for resubmission.") + # remove None comments from `description` + self._remove_metadata_comments("description", metadata) + keywords = self._extract_metadata_keywords(param_dict) + + # extract parameters from the change list, and update `keywords` + metadata["keywords"] = (f"{self.base_dir_name}, {self.branch_perturb}, {keywords}") + + # remove None comments from `keywords` + self._remove_metadata_comments("keywords", metadata) + + self._write_ryaml(metadata, metadata_path) # write to file def _output_existing_pbs_jobs(self): """ @@ -902,11 +870,15 @@ def _output_existing_pbs_jobs(self): key, value = line.split(" = ", 1) # save key current_key = key.strip() current_value = value.strip() + + # remove the `current_job_status` file + os.remove(current_job_status_path) + return pbs_jobs def _check_duplicated_jobs(self, pbs_jobs, expt_path): - def extract_current_and_parent_folder(tmp_path): + def extract_current_and_parent_path(tmp_path): # extract base_name or expt_name from pbs jobs folder_path = "/" + "/".join(tmp_path.split("/")[1:-1]) @@ -930,7 +902,7 @@ def extract_current_and_parent_folder(tmp_path): for parent_path, folder_paths in parent_paths.items(): if expt_path in folder_paths: print( - f"You have duplicated runs for folder '{os.path.basename(expt_path)}' in the same folder '{parent_path}', " + f"-- You have duplicated runs for folder '{os.path.basename(expt_path)}' in the same folder '{parent_path}', " f"hence not submitting this job!\n" ) duplicated = True @@ -961,6 +933,10 @@ def runs(): ) if not duplicated: + + # clean `work` directory for failed jobs + self._clean_workspace(expt_path) + if num_runs > 0: runs() else: @@ -968,6 +944,18 @@ def runs(): f"-- number of runs is {num_runs}, hence no new experiments will start!\n" ) + def _clean_workspace(self, dir_path): + """ + Cleans `work` directory for failed jobs. + """ + work_dir = os.path.join(dir_path, "work") + # in case any failed job + if os.path.islink(work_dir) and os.path.isdir(work_dir): + # Payu sweep && setup to ensure the changes correctly && remove the `work` directory + command = f"payu sweep && payu setup" + subprocess.run(command, shell=True, check=False) + print(f"Clean up a failed job {work_dir} and prepare it for resubmission.") + def _check_skipping(self, param_dict, nml_group, parameter_block, expt_path): """ Checks if the tuning parameter matches the control experiment, @@ -990,9 +978,8 @@ def _check_skipping(self, param_dict, nml_group, parameter_block, expt_path): # load nml of the control experiment self.nml_ctrl = f90nml.read(os.path.join(self.base_path, parameter_block)) - if all( - cn in self.nml_ctrl.get(nml_group, {}) for cn in nml_name - ): # nml_name (i.e. tunning parameter) may not be found in the control experiment + # nml_name (i.e. tunning parameter) may not be found in the control experiment + if all(cn in self.nml_ctrl.get(nml_group, {}) for cn in nml_name): if "turning_angle" in param_dict: skip = ( self.nml_ctrl[nml_group]["cosw"] == cosw @@ -1024,9 +1011,8 @@ def _check_skipping(self, param_dict, nml_group, parameter_block, expt_path): ) return - if ( - self.tag_model == "mom6" - ): # might need MOM_parameter.all, because many parameters are in-default hence not shown up in `MOM_input` + # might need MOM_parameter.all, because many parameters are in-default hence not shown up in `MOM_input` + if (self.tag_model == "mom6"): # TODO pass @@ -1159,8 +1145,8 @@ def main(self): yamlfile = os.path.join(self.dir_manager, INPUT_YAML) self.load_variables(yamlfile) - self.load_tools() self.create_test_path() + self.load_tools() self.manage_ctrl_expt() if self.run_namelists: print("==== Start perturbation experiments ====") diff --git a/expts_manager/Expts_manager.yaml b/expts_manager/Expts_manager.yaml index c844701..4bcfd26 100644 --- a/expts_manager/Expts_manager.yaml +++ b/expts_manager/Expts_manager.yaml @@ -41,10 +41,11 @@ config.yaml: ncpus: 240 mem: 960GB walltime: 24:00:00 - jobname: base-1deg_jra55_ryf_ctrl +# jobname: base-1deg_jra55_ryf_ctrl metadata: enable: true runlog: true + restart_freq: 1 cpl_dt: 1800.0 # Coupling timestep in the `nuopc_runseq` nuopc.runconfig: CLOCK_attributes: