Skip to content

Commit

Permalink
Ray Tune checkpointing fix, allow LR schedules for non-PCGrad opt, an…
Browse files Browse the repository at this point in the history
…d more. (#142)

* feat: add option to include SLURM jobid name in training dir

* feat: add command-line option to enable horovod

* feat: Use comet offline logging in Ray Tune runs

* fix: bug in raytune command

* fix: handle TF version-dependent names of the legacy optimizer

* feat: add event and met losses to raytune search space

* feat: added sbatch script for Horovod training on JURECA

* fix: Ray Tune checkpoint saving and loading

* feat: allow lr schedules when not using PCGrad

* chore: add print of loaded opt weights

* fix: handle TF version-dependent names of the legacy optimizer

Former-commit-id: deb05ea
  • Loading branch information
erwulff authored Sep 20, 2022
1 parent ba0013d commit 49cba23
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 90 deletions.
62 changes: 62 additions & 0 deletions mlpf/jureca/horovod_pipeline_train.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/sh

#SBATCH --account=raise-ctp2
#SBATCH --partition=dc-gpu
#SBATCH --time 24:00:00
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --gres=gpu:4

# Job name
#SBATCH -J pipehorovod

# Output and error logs
#SBATCH -o logs_slurm/log_%x_%j.out
#SBATCH -e logs_slurm/log_%x_%j.err

# Add jobscript to job output
echo "#################### Job submission script. #############################"
cat $0
echo "################# End of job submission script. #########################"


module --force purge
module load Stages/2022
module load GCC GCCcore/.11.2.0 CMake NCCL CUDA cuDNN OpenMPI

export CUDA_VISIBLE_DEVICES=0,1,2,3
jutil env activate -p raise-ctp2

sleep 1
nvidia-smi

source /p/project/raise-ctp2/cern/miniconda3/bin/activate tf2
echo "Python used:"
which python3
python3 --version


echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: SLURM_NODEID: $SLURM_NODEID"
echo "DEBUG: SLURM_LOCALID: $SLURM_LOCALID"
echo "DEBUG: SLURM_PROCID: $SLURM_PROCID"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"


export NCCL_DEBUG=INFO
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" > 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi
echo $OMP_NUM_THREADS


echo 'Starting training.'
srun --cpu-bind=none python mlpf/pipeline.py train -c $1 -p $2 --comet-offline -j $SLURM_JOBID -m
echo 'Training done.'
184 changes: 98 additions & 86 deletions mlpf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
get_best_checkpoint,
get_datasets,
get_heptfds_dataset,
get_latest_checkpoint,
get_loss_dict,
get_lr_schedule,
get_optimizer,
Expand Down Expand Up @@ -87,13 +88,17 @@ def main():
@click.option("--plot-freq", default=None, help="plot detailed validation every N epochs", type=int)
@click.option("--customize", help="customization function", type=str, default=None)
@click.option("--comet-offline", help="log comet-ml experiment locally", is_flag=True)
def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline):

@click.option("-j", "--jobid", help="log log the Slurm job ID in experiments dir", type=str, default=None)
@click.option("-m", "--horovod_enabled", help="Enable multi-node training using Horovod", is_flag=True)
def train(
config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline, jobid, horovod_enabled
):
# tf.debugging.enable_check_numerics()

"""Train a model defined by config"""
config_file_path = config
config, config_file_stem = parse_config(config, nepochs=nepochs, weights=weights)
print(f"loaded config file: {config_file_path}")

if plot_freq:
config["callbacks"]["plot_freq"] = plot_freq
Expand All @@ -102,7 +107,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
config = customization_functions[customize](config)

# Decide tf.distribute.strategy depending on number of available GPUs
horovod_enabled = config["setup"]["horovod_enabled"]
if horovod_enabled:
pass
else:
horovod_enabled = config["setup"]["horovod_enabled"]
if horovod_enabled:
num_gpus = initialize_horovod()
else:
Expand Down Expand Up @@ -146,6 +154,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
experiment.log_code("mlpf/tfmodel/utils.py")
experiment.log_code(config_file_path)

if jobid is not None:
with open(f"{outdir}/{jobid}.txt", "w") as f:
f.write(f"{jobid}\n")

ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "train")
ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "test")
ds_val, ds_info = get_heptfds_dataset(
Expand Down Expand Up @@ -176,9 +188,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
with strategy.scope():
model, optim_callbacks, initial_epoch = model_scope(config, total_steps, weights)

callbacks = prepare_callbacks(
config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=config["setup"]["horovod_enabled"]
)
callbacks = prepare_callbacks(config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=horovod_enabled)

verbose = 1
if horovod_enabled:
Expand All @@ -194,7 +204,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=initial_epoch + config["setup"]["num_epochs"],
epochs=config["setup"]["num_epochs"],
callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
Expand All @@ -220,7 +230,7 @@ def model_save(outdir, fit_result, model, weights):
print("Training done.")


def model_scope(config, total_steps, weights, horovod_enabled=False):
def model_scope(config, total_steps, weights=None, horovod_enabled=False):
lr_schedule, optim_callbacks, lr = get_lr_schedule(config, steps=total_steps)
opt = get_optimizer(config, lr_schedule)

Expand All @@ -241,15 +251,17 @@ def model_scope(config, total_steps, weights, horovod_enabled=False):
loaded_opt = None

if weights:
if lr_schedule:
raise Exception("Restoring the optimizer state with a learning rate schedule is currently not supported")
if lr_schedule and (opt.__class__.__module__ == "tfmodel.PCGrad_tf"):
raise Exception("Restoring the PCGrad optimizer state with a learning rate schedule is currently not supported")

# We need to load the weights in the same trainable configuration as the model was set up
configure_model_weights(model, config["setup"].get("weights_config", "all"))
model.load_weights(weights, by_name=True)
print("INFO: using checkpointed model weights from: {}".format(weights))
opt_weight_file = weights.replace("hdf5", "pkl").replace("/weights-", "/opt-")
if os.path.isfile(opt_weight_file):
loaded_opt = pickle.load(open(opt_weight_file, "rb"))
print("INFO: using checkpointed optimizer weights from: {}".format(opt_weight_file))

initial_epoch = int(weights.split("/")[-1].split("-")[1])
model.build((1, config["dataset"]["padded_num_elem_size"], config["dataset"]["num_input_features"]))
Expand Down Expand Up @@ -291,7 +303,9 @@ def model_weight_setting():
grad_vars = model.trainable_weights
zero_grads = [tf.zeros_like(w) for w in grad_vars]
model.optimizer.apply_gradients(zip(zero_grads, grad_vars))
if model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1":
if (model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1") or (
model.optimizer.__class__.__module__ == "keras.optimizer_v1"
):
model.optimizer.optimizer.optimizer.set_weights(loaded_opt["weights"])
else:
model.optimizer.set_weights(loaded_opt["weights"])
Expand Down Expand Up @@ -571,6 +585,8 @@ def hypertune(config, outdir, ntrain, ntest, recreate):


def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=None, ntest=None, name=None, seeds=False):
from collections import Counter

from ray import tune
from ray.tune.integration.keras import TuneReportCheckpointCallback
from raytune.search_space import set_raytune_search_parameters
Expand All @@ -586,6 +602,17 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=
if config is not None:
full_config = set_raytune_search_parameters(search_space=config, config=full_config)

print("Using comet-ml OfflineExperiment, saving logs locally.")
experiment = OfflineExperiment(
project_name="particleflow-tf-gen",
auto_metric_logging=True,
auto_param_logging=True,
auto_histogram_weight_logging=True,
auto_histogram_gradient_logging=False,
auto_histogram_activation_logging=False,
offline_directory=tune.get_trial_dir() + "/cometml",
)

strategy, num_gpus = get_strategy()

ds_train, num_train_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "train")
Expand All @@ -612,91 +639,76 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=
total_steps = num_train_steps * full_config["setup"]["num_epochs"]
print("total_steps", total_steps)

with strategy.scope():
weights = get_latest_checkpoint(Path(checkpoint_dir).parent) if (checkpoint_dir is not None) else None
model, optim_callbacks, initial_epoch = model_scope(full_config, total_steps, weights=weights)

callbacks = prepare_callbacks(
full_config,
tune.get_trial_dir(),
ds_val,
comet_experiment=experiment,
horovod_enabled=False,
)

callbacks = callbacks[:-1] # remove the CustomCallback at the end of the list

with strategy.scope():
lr_schedule, optim_callbacks = get_lr_schedule(full_config, steps=total_steps)
callbacks.append(optim_callbacks)
opt = get_optimizer(full_config, lr_schedule)

model = make_model(full_config, dtype=tf.dtypes.float32)

# Run model once to build the layers
model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"]))
callbacks.append(optim_callbacks)

full_config = set_config_loss(full_config, full_config["setup"]["trainable"])
configure_model_weights(model, full_config["setup"]["trainable"])
model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"]))
tune_report_checkpoint_callback = TuneReportCheckpointCallback(
metrics=[
"adam_beta_1",
"charge_loss",
"cls_acc_unweighted",
"cls_loss",
"cos_phi_loss",
"energy_loss",
"eta_loss",
"learning_rate",
"loss",
"pt_loss",
"sin_phi_loss",
"val_charge_loss",
"val_cls_acc_unweighted",
"val_cls_acc_weighted",
"val_cls_loss",
"val_cos_phi_loss",
"val_energy_loss",
"val_eta_loss",
"val_loss",
"val_pt_loss",
"val_sin_phi_loss",
],
)

loss_dict, loss_weights = get_loss_dict(full_config)
model.compile(
loss=loss_dict,
optimizer=opt,
sample_weight_mode="temporal",
loss_weights=loss_weights,
metrics={
"cls": [
FlattenedCategoricalAccuracy(name="acc_unweighted", dtype=tf.float64),
FlattenedCategoricalAccuracy(use_weights=True, name="acc_weighted", dtype=tf.float64),
]
},
)
model.summary()
# To make TuneReportCheckpointCallback continue the numbering of checkpoints correctly
if weights is not None:
latest_saved_checkpoint_number = int(Path(weights).name.split("-")[1])
print("INFO: setting TuneReportCheckpointCallback epoch number to {}".format(latest_saved_checkpoint_number))
tune_report_checkpoint_callback._checkpoint._counter = Counter()
tune_report_checkpoint_callback._checkpoint._counter["epoch_end"] = latest_saved_checkpoint_number
tune_report_checkpoint_callback._checkpoint._cp_count = latest_saved_checkpoint_number
callbacks.append(tune_report_checkpoint_callback)

callbacks.append(
TuneReportCheckpointCallback(
metrics=[
"adam_beta_1",
"charge_loss",
"cls_acc_unweighted",
"cls_loss",
"cos_phi_loss",
"energy_loss",
"eta_loss",
"learning_rate",
"loss",
"pt_loss",
"sin_phi_loss",
"val_charge_loss",
"val_cls_acc_unweighted",
"val_cls_acc_weighted",
"val_cls_loss",
"val_cos_phi_loss",
"val_energy_loss",
"val_eta_loss",
"val_loss",
"val_pt_loss",
"val_sin_phi_loss",
],
),
try:
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=full_config["setup"]["num_epochs"],
callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
initial_epoch=initial_epoch,
)

try:
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=full_config["setup"]["num_epochs"],
callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
)
except tf.errors.ResourceExhaustedError:
logging.warning("Resource exhausted, skipping this hyperparameter configuration.")
skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt"
lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()]

with open(skiplog_file_path, "a") as f:
f.write("#" * 80 + "\n")
for line in lines:
f.write(line)
logging.warning(line[:-1])
f.write("#" * 80 + "\n\n")
except tf.errors.ResourceExhaustedError:
logging.warning("Resource exhausted, skipping this hyperparameter configuration.")
skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt"
lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()]

with open(skiplog_file_path, "a") as f:
f.write("#" * 80 + "\n")
for line in lines:
f.write(line)
logging.warning(line[:-1])
f.write("#" * 80 + "\n\n")


@main.command()
Expand Down
Loading

0 comments on commit 49cba23

Please sign in to comment.