From 49cba23664817a0682c7dc5dd4954a72f793d14a Mon Sep 17 00:00:00 2001 From: Eric Wulff <31319227+erwulff@users.noreply.github.com> Date: Tue, 20 Sep 2022 08:10:14 +0200 Subject: [PATCH] Ray Tune checkpointing fix, allow LR schedules for non-PCGrad opt, and more. (#142) * feat: add option to include SLURM jobid name in training dir * feat: add command-line option to enable horovod * feat: Use comet offline logging in Ray Tune runs * fix: bug in raytune command * fix: handle TF version-dependent names of the legacy optimizer * feat: add event and met losses to raytune search space * feat: added sbatch script for Horovod training on JURECA * fix: Ray Tune checkpoint saving and loading * feat: allow lr schedules when not using PCGrad * chore: add print of loaded opt weights * fix: handle TF version-dependent names of the legacy optimizer Former-commit-id: deb05eac9ff7a1d02c531882f21b284896761f03 --- mlpf/jureca/horovod_pipeline_train.sh | 62 +++++++++ mlpf/pipeline.py | 184 ++++++++++++++------------ mlpf/raytune/search_space.py | 49 +++++++ mlpf/tfmodel/model_setup.py | 5 +- mlpf/tfmodel/utils.py | 12 +- mlpf/tfmodel/utils_analysis.py | 2 +- 6 files changed, 224 insertions(+), 90 deletions(-) create mode 100644 mlpf/jureca/horovod_pipeline_train.sh diff --git a/mlpf/jureca/horovod_pipeline_train.sh b/mlpf/jureca/horovod_pipeline_train.sh new file mode 100644 index 000000000..343cb0be8 --- /dev/null +++ b/mlpf/jureca/horovod_pipeline_train.sh @@ -0,0 +1,62 @@ +#!/bin/sh + +#SBATCH --account=raise-ctp2 +#SBATCH --partition=dc-gpu +#SBATCH --time 24:00:00 +#SBATCH --nodes 1 +#SBATCH --tasks-per-node=1 +#SBATCH --gres=gpu:4 + +# Job name +#SBATCH -J pipehorovod + +# Output and error logs +#SBATCH -o logs_slurm/log_%x_%j.out +#SBATCH -e logs_slurm/log_%x_%j.err + +# Add jobscript to job output +echo "#################### Job submission script. #############################" +cat $0 +echo "################# End of job submission script. #########################" + + +module --force purge +module load Stages/2022 +module load GCC GCCcore/.11.2.0 CMake NCCL CUDA cuDNN OpenMPI + +export CUDA_VISIBLE_DEVICES=0,1,2,3 +jutil env activate -p raise-ctp2 + +sleep 1 +nvidia-smi + +source /p/project/raise-ctp2/cern/miniconda3/bin/activate tf2 +echo "Python used:" +which python3 +python3 --version + + +echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID" +echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST" +echo "DEBUG: SLURM_NNODES: $SLURM_NNODES" +echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS" +echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE" +echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST" +echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME" +echo "DEBUG: SLURM_NODEID: $SLURM_NODEID" +echo "DEBUG: SLURM_LOCALID: $SLURM_LOCALID" +echo "DEBUG: SLURM_PROCID: $SLURM_PROCID" +echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES" + + +export NCCL_DEBUG=INFO +export OMP_NUM_THREADS=1 +if [ "$SLURM_CPUS_PER_TASK" > 0 ] ; then + export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK +fi +echo $OMP_NUM_THREADS + + +echo 'Starting training.' +srun --cpu-bind=none python mlpf/pipeline.py train -c $1 -p $2 --comet-offline -j $SLURM_JOBID -m +echo 'Training done.' diff --git a/mlpf/pipeline.py b/mlpf/pipeline.py index 4a55dd24b..de5fb1e5e 100644 --- a/mlpf/pipeline.py +++ b/mlpf/pipeline.py @@ -37,6 +37,7 @@ get_best_checkpoint, get_datasets, get_heptfds_dataset, + get_latest_checkpoint, get_loss_dict, get_lr_schedule, get_optimizer, @@ -87,13 +88,17 @@ def main(): @click.option("--plot-freq", default=None, help="plot detailed validation every N epochs", type=int) @click.option("--customize", help="customization function", type=str, default=None) @click.option("--comet-offline", help="log comet-ml experiment locally", is_flag=True) -def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline): - +@click.option("-j", "--jobid", help="log log the Slurm job ID in experiments dir", type=str, default=None) +@click.option("-m", "--horovod_enabled", help="Enable multi-node training using Horovod", is_flag=True) +def train( + config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline, jobid, horovod_enabled +): # tf.debugging.enable_check_numerics() """Train a model defined by config""" config_file_path = config config, config_file_stem = parse_config(config, nepochs=nepochs, weights=weights) + print(f"loaded config file: {config_file_path}") if plot_freq: config["callbacks"]["plot_freq"] = plot_freq @@ -102,7 +107,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, config = customization_functions[customize](config) # Decide tf.distribute.strategy depending on number of available GPUs - horovod_enabled = config["setup"]["horovod_enabled"] + if horovod_enabled: + pass + else: + horovod_enabled = config["setup"]["horovod_enabled"] if horovod_enabled: num_gpus = initialize_horovod() else: @@ -146,6 +154,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, experiment.log_code("mlpf/tfmodel/utils.py") experiment.log_code(config_file_path) + if jobid is not None: + with open(f"{outdir}/{jobid}.txt", "w") as f: + f.write(f"{jobid}\n") + ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "train") ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "test") ds_val, ds_info = get_heptfds_dataset( @@ -176,9 +188,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, with strategy.scope(): model, optim_callbacks, initial_epoch = model_scope(config, total_steps, weights) - callbacks = prepare_callbacks( - config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=config["setup"]["horovod_enabled"] - ) + callbacks = prepare_callbacks(config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=horovod_enabled) verbose = 1 if horovod_enabled: @@ -194,7 +204,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, model.fit( ds_train.repeat(), validation_data=ds_test.repeat(), - epochs=initial_epoch + config["setup"]["num_epochs"], + epochs=config["setup"]["num_epochs"], callbacks=callbacks, steps_per_epoch=num_train_steps, validation_steps=num_test_steps, @@ -220,7 +230,7 @@ def model_save(outdir, fit_result, model, weights): print("Training done.") -def model_scope(config, total_steps, weights, horovod_enabled=False): +def model_scope(config, total_steps, weights=None, horovod_enabled=False): lr_schedule, optim_callbacks, lr = get_lr_schedule(config, steps=total_steps) opt = get_optimizer(config, lr_schedule) @@ -241,15 +251,17 @@ def model_scope(config, total_steps, weights, horovod_enabled=False): loaded_opt = None if weights: - if lr_schedule: - raise Exception("Restoring the optimizer state with a learning rate schedule is currently not supported") + if lr_schedule and (opt.__class__.__module__ == "tfmodel.PCGrad_tf"): + raise Exception("Restoring the PCGrad optimizer state with a learning rate schedule is currently not supported") # We need to load the weights in the same trainable configuration as the model was set up configure_model_weights(model, config["setup"].get("weights_config", "all")) model.load_weights(weights, by_name=True) + print("INFO: using checkpointed model weights from: {}".format(weights)) opt_weight_file = weights.replace("hdf5", "pkl").replace("/weights-", "/opt-") if os.path.isfile(opt_weight_file): loaded_opt = pickle.load(open(opt_weight_file, "rb")) + print("INFO: using checkpointed optimizer weights from: {}".format(opt_weight_file)) initial_epoch = int(weights.split("/")[-1].split("-")[1]) model.build((1, config["dataset"]["padded_num_elem_size"], config["dataset"]["num_input_features"])) @@ -291,7 +303,9 @@ def model_weight_setting(): grad_vars = model.trainable_weights zero_grads = [tf.zeros_like(w) for w in grad_vars] model.optimizer.apply_gradients(zip(zero_grads, grad_vars)) - if model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1": + if (model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1") or ( + model.optimizer.__class__.__module__ == "keras.optimizer_v1" + ): model.optimizer.optimizer.optimizer.set_weights(loaded_opt["weights"]) else: model.optimizer.set_weights(loaded_opt["weights"]) @@ -571,6 +585,8 @@ def hypertune(config, outdir, ntrain, ntest, recreate): def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=None, ntest=None, name=None, seeds=False): + from collections import Counter + from ray import tune from ray.tune.integration.keras import TuneReportCheckpointCallback from raytune.search_space import set_raytune_search_parameters @@ -586,6 +602,17 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain= if config is not None: full_config = set_raytune_search_parameters(search_space=config, config=full_config) + print("Using comet-ml OfflineExperiment, saving logs locally.") + experiment = OfflineExperiment( + project_name="particleflow-tf-gen", + auto_metric_logging=True, + auto_param_logging=True, + auto_histogram_weight_logging=True, + auto_histogram_gradient_logging=False, + auto_histogram_activation_logging=False, + offline_directory=tune.get_trial_dir() + "/cometml", + ) + strategy, num_gpus = get_strategy() ds_train, num_train_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "train") @@ -612,91 +639,76 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain= total_steps = num_train_steps * full_config["setup"]["num_epochs"] print("total_steps", total_steps) + with strategy.scope(): + weights = get_latest_checkpoint(Path(checkpoint_dir).parent) if (checkpoint_dir is not None) else None + model, optim_callbacks, initial_epoch = model_scope(full_config, total_steps, weights=weights) + callbacks = prepare_callbacks( full_config, tune.get_trial_dir(), ds_val, + comet_experiment=experiment, + horovod_enabled=False, ) - callbacks = callbacks[:-1] # remove the CustomCallback at the end of the list - - with strategy.scope(): - lr_schedule, optim_callbacks = get_lr_schedule(full_config, steps=total_steps) - callbacks.append(optim_callbacks) - opt = get_optimizer(full_config, lr_schedule) - - model = make_model(full_config, dtype=tf.dtypes.float32) - - # Run model once to build the layers - model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"])) + callbacks.append(optim_callbacks) - full_config = set_config_loss(full_config, full_config["setup"]["trainable"]) - configure_model_weights(model, full_config["setup"]["trainable"]) - model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"])) + tune_report_checkpoint_callback = TuneReportCheckpointCallback( + metrics=[ + "adam_beta_1", + "charge_loss", + "cls_acc_unweighted", + "cls_loss", + "cos_phi_loss", + "energy_loss", + "eta_loss", + "learning_rate", + "loss", + "pt_loss", + "sin_phi_loss", + "val_charge_loss", + "val_cls_acc_unweighted", + "val_cls_acc_weighted", + "val_cls_loss", + "val_cos_phi_loss", + "val_energy_loss", + "val_eta_loss", + "val_loss", + "val_pt_loss", + "val_sin_phi_loss", + ], + ) - loss_dict, loss_weights = get_loss_dict(full_config) - model.compile( - loss=loss_dict, - optimizer=opt, - sample_weight_mode="temporal", - loss_weights=loss_weights, - metrics={ - "cls": [ - FlattenedCategoricalAccuracy(name="acc_unweighted", dtype=tf.float64), - FlattenedCategoricalAccuracy(use_weights=True, name="acc_weighted", dtype=tf.float64), - ] - }, - ) - model.summary() + # To make TuneReportCheckpointCallback continue the numbering of checkpoints correctly + if weights is not None: + latest_saved_checkpoint_number = int(Path(weights).name.split("-")[1]) + print("INFO: setting TuneReportCheckpointCallback epoch number to {}".format(latest_saved_checkpoint_number)) + tune_report_checkpoint_callback._checkpoint._counter = Counter() + tune_report_checkpoint_callback._checkpoint._counter["epoch_end"] = latest_saved_checkpoint_number + tune_report_checkpoint_callback._checkpoint._cp_count = latest_saved_checkpoint_number + callbacks.append(tune_report_checkpoint_callback) - callbacks.append( - TuneReportCheckpointCallback( - metrics=[ - "adam_beta_1", - "charge_loss", - "cls_acc_unweighted", - "cls_loss", - "cos_phi_loss", - "energy_loss", - "eta_loss", - "learning_rate", - "loss", - "pt_loss", - "sin_phi_loss", - "val_charge_loss", - "val_cls_acc_unweighted", - "val_cls_acc_weighted", - "val_cls_loss", - "val_cos_phi_loss", - "val_energy_loss", - "val_eta_loss", - "val_loss", - "val_pt_loss", - "val_sin_phi_loss", - ], - ), + try: + model.fit( + ds_train.repeat(), + validation_data=ds_test.repeat(), + epochs=full_config["setup"]["num_epochs"], + callbacks=callbacks, + steps_per_epoch=num_train_steps, + validation_steps=num_test_steps, + initial_epoch=initial_epoch, ) - - try: - model.fit( - ds_train.repeat(), - validation_data=ds_test.repeat(), - epochs=full_config["setup"]["num_epochs"], - callbacks=callbacks, - steps_per_epoch=num_train_steps, - validation_steps=num_test_steps, - ) - except tf.errors.ResourceExhaustedError: - logging.warning("Resource exhausted, skipping this hyperparameter configuration.") - skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt" - lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()] - - with open(skiplog_file_path, "a") as f: - f.write("#" * 80 + "\n") - for line in lines: - f.write(line) - logging.warning(line[:-1]) - f.write("#" * 80 + "\n\n") + except tf.errors.ResourceExhaustedError: + logging.warning("Resource exhausted, skipping this hyperparameter configuration.") + skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt" + lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()] + + with open(skiplog_file_path, "a") as f: + f.write("#" * 80 + "\n") + for line in lines: + f.write(line) + logging.warning(line[:-1]) + f.write("#" * 80 + "\n\n") @main.command() diff --git a/mlpf/raytune/search_space.py b/mlpf/raytune/search_space.py index 4301e9701..19273ce5e 100644 --- a/mlpf/raytune/search_space.py +++ b/mlpf/raytune/search_space.py @@ -22,6 +22,21 @@ "clip_value_low": samp([0.0]), "normalize_degrees": samp([True]), "output_dim": samp([128]), + # "event_loss": samp(["none", "sliced_wasserstein", "gen_jet_logcosh", "hist_2d"]), + # "met_loss": samp([ + # "none", + # {"type": "Huber", "delta": 10.0} + # ]), + "event_and_met_loss": samp( + [ + ("none", "none"), + ("sliced_wasserstein", "none"), + ("gen_jet_logcosh", "none"), + ("hist_2d", "none"), + ("none", "met"), + ] + ), + # "mask_reg_cls0": samp([False, True]), } # search_space = { @@ -108,6 +123,40 @@ def set_raytune_search_parameters(search_space, config): if "expdecay_decay_rate" in search_space.keys(): config["exponentialdecay"]["decay_rate"] = search_space["expdecay_decay_rate"] + if "event_loss" in search_space.keys(): + config["loss"]["event_loss"] = search_space["event_loss"] + if search_space["event_loss"] == "none": + config["loss"]["event_loss_coef"] = 0.0 + else: + config["loss"]["event_loss_coef"] = 1.0 + + if "met_loss" in search_space.keys(): + config["loss"]["met_loss"] = search_space["event_loss"] + if search_space["met_loss"] == "none": + config["loss"]["met_loss_coef"] = 0.0 + else: + config["loss"]["met_loss_coef"] = 1.0 + + if "event_and_met_loss" in search_space.keys(): + event_l, met_l = search_space["event_and_met_loss"] + + config["loss"]["event_loss"] = event_l + + if event_l == "none": + config["loss"]["event_loss_coef"] = 0.0 + else: + config["loss"]["event_loss_coef"] = 1.0 + + if met_l == "none": + config["loss"]["met_loss"] = met_l + config["loss"]["met_loss_coef"] = 0.0 + else: + config["loss"]["met_loss"] = {"type": "Huber", "delta": 10.0} + config["loss"]["met_loss_coef"] = 1.0 + + if "mask_reg_cls0" in search_space.keys(): + config["parameters"]["output_decoding"]["mask_reg_cls0"] = search_space["mask_reg_cls0"] + if "lr_schedule" in search_space.keys(): config["setup"]["lr_schedule"] = search_space["lr_schedule"] diff --git a/mlpf/tfmodel/model_setup.py b/mlpf/tfmodel/model_setup.py index c0dfdbc1d..9d99a04bc 100644 --- a/mlpf/tfmodel/model_setup.py +++ b/mlpf/tfmodel/model_setup.py @@ -32,7 +32,10 @@ def on_epoch_end(self, epoch, logs=None): weightfile_path = self.opt_path.format(epoch=epoch + 1, **logs) try: # PCGrad is derived from the legacy optimizer - if self.model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1": + # module name differs in different TF versions + if (self.model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1") or ( + self.model.optimizer.__class__.__module__ == "keras.optimizer_v1" + ): # lr = self.model.optimizer.optimizer.optimizer.lr weights = self.model.optimizer.optimizer.optimizer.get_weights() else: diff --git a/mlpf/tfmodel/utils.py b/mlpf/tfmodel/utils.py index 175448b75..6ecde2a7e 100644 --- a/mlpf/tfmodel/utils.py +++ b/mlpf/tfmodel/utils.py @@ -69,7 +69,7 @@ def parse_config(config, ntrain=None, ntest=None, nepochs=None, weights=None): if "multi_output" not in config["setup"]: config["setup"]["multi_output"] = True - if weights is None: + if weights is not None: config["setup"]["weights"] = weights return config, config_file_stem @@ -92,11 +92,19 @@ def create_experiment_dir(prefix=None, suffix=None): def get_best_checkpoint(train_dir): checkpoint_list = list(Path(Path(train_dir) / "weights").glob("weights*.hdf5")) # Sort the checkpoints according to the loss in their filenames - checkpoint_list.sort(key=lambda x: float(re.search("\d+-\d+.\d+", str(x))[0].split("-")[-1])) + checkpoint_list.sort(key=lambda x: float(re.search("\d+-\d+.\d+", str(x.name))[0].split("-")[-1])) # Return the checkpoint with smallest loss return str(checkpoint_list[0]) +def get_latest_checkpoint(train_dir): + checkpoint_list = list(Path(Path(train_dir) / "weights").glob("weights*.hdf5")) + # Sort the checkpoints according to the epoch number in their filenames + checkpoint_list.sort(key=lambda x: int(re.search("\d+-\d+.\d+", str(x.name))[0].split("-")[0])) + # Return the checkpoint with highest epoch number + return str(checkpoint_list[-1]) + + def delete_all_but_best_checkpoint(train_dir, dry_run): checkpoint_list = list(Path(Path(train_dir) / "weights").glob("weights*.hdf5")) # Don't remove the checkpoint with smallest loss diff --git a/mlpf/tfmodel/utils_analysis.py b/mlpf/tfmodel/utils_analysis.py index 6a4739b99..b59a57eec 100644 --- a/mlpf/tfmodel/utils_analysis.py +++ b/mlpf/tfmodel/utils_analysis.py @@ -21,7 +21,7 @@ def func(key): def plot_ray_analysis(analysis, save=False, skip=0): to_plot = [ - # 'adam_beta_1', + "adam_beta_1", "charge_loss", "cls_acc_unweighted", "cls_loss",