Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray Tune checkpointing fix, allow LR schedules for non-PCGrad opt, and more. #142

Merged
merged 34 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1207b06
Merge pull request #1 from jpata/master
erwulff Jul 9, 2021
12fa88d
Merge pull request #2 from jpata/master
erwulff Jul 23, 2021
827c110
Merge branch 'jpata:master' into master
erwulff Aug 4, 2021
e8a44d3
Merge pull request #3 from jpata/master
erwulff Sep 3, 2021
f87b3ad
Merge branch 'jpata:master' into master
erwulff Sep 15, 2021
9c8502a
Merge pull request #4 from jpata/master
erwulff Oct 8, 2021
2126c38
Merge branch 'jpata:master' into master
erwulff Nov 2, 2021
ab9a4b9
Merge branch 'jpata:master' into master
erwulff Nov 15, 2021
c86a23f
Merge branch 'jpata:master' into master
erwulff Dec 7, 2021
c944d87
Merge branch 'jpata:master' into master
erwulff Dec 7, 2021
1c6189f
Merge pull request #6 from jpata/master
erwulff Mar 11, 2022
43cca5d
Merge branch 'jpata:master' into master
erwulff Apr 29, 2022
85ef3ba
Merge branch 'jpata:master' into master
erwulff Apr 29, 2022
d8e0aac
Merge branch 'jpata:master' into master
erwulff May 2, 2022
f132bbc
Merge branch 'jpata:master' into master
erwulff May 4, 2022
65179c6
Merge branch 'jpata:master' into master
erwulff May 6, 2022
914a709
Merge branch 'jpata:master' into master
erwulff May 20, 2022
830c012
Merge branch 'jpata:master' into master
erwulff Jul 13, 2022
59df47a
Merge branch 'master' of github.com:erwulff/particleflow
erwulff Aug 15, 2022
9adbe9d
Merge branch 'master' of github.com:erwulff/particleflow
erwulff Aug 17, 2022
0820179
Merge branch 'master' of github.com:erwulff/particleflow
erwulff Aug 30, 2022
0957c5d
Merge branch 'master' of github.com:erwulff/particleflow
erwulff Sep 13, 2022
738c094
Merge branch 'master' of github.com:erwulff/particleflow
erwulff Sep 15, 2022
3e77186
feat: add option to include SLURM jobid name in training dir
erwulff Sep 15, 2022
c63e8e7
feat: add command-line option to enable horovod
erwulff Sep 15, 2022
afa0c8f
feat: Use comet offline logging in Ray Tune runs
erwulff Sep 15, 2022
392954b
fix: bug in raytune command
erwulff Sep 15, 2022
d961a15
fix: handle TF version-dependent names of the legacy optimizer
erwulff Sep 15, 2022
f2effd5
feat: add event and met losses to raytune search space
erwulff Sep 15, 2022
db774dc
feat: added sbatch script for Horovod training on JURECA
erwulff Sep 15, 2022
3ac1094
fix: Ray Tune checkpoint saving and loading
erwulff Sep 15, 2022
b42c462
feat: allow lr schedules when not using PCGrad
erwulff Sep 15, 2022
c74ecc2
chore: add print of loaded opt weights
erwulff Sep 16, 2022
9efab4a
fix: handle TF version-dependent names of the legacy optimizer
erwulff Sep 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions mlpf/jureca/horovod_pipeline_train.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/sh

#SBATCH --account=raise-ctp2
#SBATCH --partition=dc-gpu
#SBATCH --time 24:00:00
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --gres=gpu:4

# Job name
#SBATCH -J pipehorovod

# Output and error logs
#SBATCH -o logs_slurm/log_%x_%j.out
#SBATCH -e logs_slurm/log_%x_%j.err

# Add jobscript to job output
echo "#################### Job submission script. #############################"
cat $0
echo "################# End of job submission script. #########################"


module --force purge
module load Stages/2022
module load GCC GCCcore/.11.2.0 CMake NCCL CUDA cuDNN OpenMPI

export CUDA_VISIBLE_DEVICES=0,1,2,3
jutil env activate -p raise-ctp2

sleep 1
nvidia-smi

source /p/project/raise-ctp2/cern/miniconda3/bin/activate tf2
echo "Python used:"
which python3
python3 --version


echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: SLURM_NODEID: $SLURM_NODEID"
echo "DEBUG: SLURM_LOCALID: $SLURM_LOCALID"
echo "DEBUG: SLURM_PROCID: $SLURM_PROCID"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"


export NCCL_DEBUG=INFO
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" > 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi
echo $OMP_NUM_THREADS


echo 'Starting training.'
srun --cpu-bind=none python mlpf/pipeline.py train -c $1 -p $2 --comet-offline -j $SLURM_JOBID -m
echo 'Training done.'
184 changes: 98 additions & 86 deletions mlpf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
get_best_checkpoint,
get_datasets,
get_heptfds_dataset,
get_latest_checkpoint,
get_loss_dict,
get_lr_schedule,
get_optimizer,
Expand Down Expand Up @@ -87,13 +88,17 @@ def main():
@click.option("--plot-freq", default=None, help="plot detailed validation every N epochs", type=int)
@click.option("--customize", help="customization function", type=str, default=None)
@click.option("--comet-offline", help="log comet-ml experiment locally", is_flag=True)
def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline):

@click.option("-j", "--jobid", help="log log the Slurm job ID in experiments dir", type=str, default=None)
@click.option("-m", "--horovod_enabled", help="Enable multi-node training using Horovod", is_flag=True)
def train(
config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq, customize, comet_offline, jobid, horovod_enabled
):
# tf.debugging.enable_check_numerics()

"""Train a model defined by config"""
config_file_path = config
config, config_file_stem = parse_config(config, nepochs=nepochs, weights=weights)
print(f"loaded config file: {config_file_path}")

if plot_freq:
config["callbacks"]["plot_freq"] = plot_freq
Expand All @@ -102,7 +107,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
config = customization_functions[customize](config)

# Decide tf.distribute.strategy depending on number of available GPUs
horovod_enabled = config["setup"]["horovod_enabled"]
if horovod_enabled:
pass
else:
horovod_enabled = config["setup"]["horovod_enabled"]
if horovod_enabled:
num_gpus = initialize_horovod()
else:
Expand Down Expand Up @@ -146,6 +154,10 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
experiment.log_code("mlpf/tfmodel/utils.py")
experiment.log_code(config_file_path)

if jobid is not None:
with open(f"{outdir}/{jobid}.txt", "w") as f:
f.write(f"{jobid}\n")

ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "train")
ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "test")
ds_val, ds_info = get_heptfds_dataset(
Expand Down Expand Up @@ -176,9 +188,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
with strategy.scope():
model, optim_callbacks, initial_epoch = model_scope(config, total_steps, weights)

callbacks = prepare_callbacks(
config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=config["setup"]["horovod_enabled"]
)
callbacks = prepare_callbacks(config, outdir, ds_val, comet_experiment=experiment, horovod_enabled=horovod_enabled)

verbose = 1
if horovod_enabled:
Expand All @@ -194,7 +204,7 @@ def train(config, weights, ntrain, ntest, nepochs, recreate, prefix, plot_freq,
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=initial_epoch + config["setup"]["num_epochs"],
epochs=config["setup"]["num_epochs"],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change this? I think it was correct.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking is the following. Let's say config["setup"]["num_epochs"] is 100 and we resume an interrupted training from epoch 20. Then initial_epoch will be 20 and initial_epoch + config["setup"]["num_epochs"] will be 120, right? I think it's more intuitive that config["setup"]["num_epochs"] should be the total number of epochs to run before completing the training, rather than the additional number of epochs to run from the resumed point. This is a matter of taste I suppose. What do you think?

callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
Expand All @@ -220,7 +230,7 @@ def model_save(outdir, fit_result, model, weights):
print("Training done.")


def model_scope(config, total_steps, weights, horovod_enabled=False):
def model_scope(config, total_steps, weights=None, horovod_enabled=False):
lr_schedule, optim_callbacks, lr = get_lr_schedule(config, steps=total_steps)
opt = get_optimizer(config, lr_schedule)

Expand All @@ -241,15 +251,17 @@ def model_scope(config, total_steps, weights, horovod_enabled=False):
loaded_opt = None

if weights:
if lr_schedule:
raise Exception("Restoring the optimizer state with a learning rate schedule is currently not supported")
if lr_schedule and (opt.__class__.__module__ == "tfmodel.PCGrad_tf"):
raise Exception("Restoring the PCGrad optimizer state with a learning rate schedule is currently not supported")

# We need to load the weights in the same trainable configuration as the model was set up
configure_model_weights(model, config["setup"].get("weights_config", "all"))
model.load_weights(weights, by_name=True)
print("INFO: using checkpointed model weights from: {}".format(weights))
opt_weight_file = weights.replace("hdf5", "pkl").replace("/weights-", "/opt-")
if os.path.isfile(opt_weight_file):
loaded_opt = pickle.load(open(opt_weight_file, "rb"))
print("INFO: using checkpointed optimizer weights from: {}".format(opt_weight_file))

initial_epoch = int(weights.split("/")[-1].split("-")[1])
model.build((1, config["dataset"]["padded_num_elem_size"], config["dataset"]["num_input_features"]))
Expand Down Expand Up @@ -291,7 +303,9 @@ def model_weight_setting():
grad_vars = model.trainable_weights
zero_grads = [tf.zeros_like(w) for w in grad_vars]
model.optimizer.apply_gradients(zip(zero_grads, grad_vars))
if model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1":
if (model.optimizer.__class__.__module__ == "keras.optimizers.optimizer_v1") or (
model.optimizer.__class__.__module__ == "keras.optimizer_v1"
):
model.optimizer.optimizer.optimizer.set_weights(loaded_opt["weights"])
else:
model.optimizer.set_weights(loaded_opt["weights"])
Expand Down Expand Up @@ -571,6 +585,8 @@ def hypertune(config, outdir, ntrain, ntest, recreate):


def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=None, ntest=None, name=None, seeds=False):
from collections import Counter

from ray import tune
from ray.tune.integration.keras import TuneReportCheckpointCallback
from raytune.search_space import set_raytune_search_parameters
Expand All @@ -586,6 +602,17 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=
if config is not None:
full_config = set_raytune_search_parameters(search_space=config, config=full_config)

print("Using comet-ml OfflineExperiment, saving logs locally.")
experiment = OfflineExperiment(
project_name="particleflow-tf-gen",
auto_metric_logging=True,
auto_param_logging=True,
auto_histogram_weight_logging=True,
auto_histogram_gradient_logging=False,
auto_histogram_activation_logging=False,
offline_directory=tune.get_trial_dir() + "/cometml",
)

strategy, num_gpus = get_strategy()

ds_train, num_train_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "train")
Expand All @@ -612,91 +639,76 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=
total_steps = num_train_steps * full_config["setup"]["num_epochs"]
print("total_steps", total_steps)

with strategy.scope():
weights = get_latest_checkpoint(Path(checkpoint_dir).parent) if (checkpoint_dir is not None) else None
model, optim_callbacks, initial_epoch = model_scope(full_config, total_steps, weights=weights)

callbacks = prepare_callbacks(
full_config,
tune.get_trial_dir(),
ds_val,
comet_experiment=experiment,
horovod_enabled=False,
)

callbacks = callbacks[:-1] # remove the CustomCallback at the end of the list

with strategy.scope():
lr_schedule, optim_callbacks = get_lr_schedule(full_config, steps=total_steps)
callbacks.append(optim_callbacks)
opt = get_optimizer(full_config, lr_schedule)

model = make_model(full_config, dtype=tf.dtypes.float32)

# Run model once to build the layers
model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"]))
callbacks.append(optim_callbacks)

full_config = set_config_loss(full_config, full_config["setup"]["trainable"])
configure_model_weights(model, full_config["setup"]["trainable"])
model.build((1, full_config["dataset"]["padded_num_elem_size"], full_config["dataset"]["num_input_features"]))
tune_report_checkpoint_callback = TuneReportCheckpointCallback(
metrics=[
"adam_beta_1",
"charge_loss",
"cls_acc_unweighted",
"cls_loss",
"cos_phi_loss",
"energy_loss",
"eta_loss",
"learning_rate",
"loss",
"pt_loss",
"sin_phi_loss",
"val_charge_loss",
"val_cls_acc_unweighted",
"val_cls_acc_weighted",
"val_cls_loss",
"val_cos_phi_loss",
"val_energy_loss",
"val_eta_loss",
"val_loss",
"val_pt_loss",
"val_sin_phi_loss",
],
)

loss_dict, loss_weights = get_loss_dict(full_config)
model.compile(
loss=loss_dict,
optimizer=opt,
sample_weight_mode="temporal",
loss_weights=loss_weights,
metrics={
"cls": [
FlattenedCategoricalAccuracy(name="acc_unweighted", dtype=tf.float64),
FlattenedCategoricalAccuracy(use_weights=True, name="acc_weighted", dtype=tf.float64),
]
},
)
model.summary()
# To make TuneReportCheckpointCallback continue the numbering of checkpoints correctly
if weights is not None:
latest_saved_checkpoint_number = int(Path(weights).name.split("-")[1])
print("INFO: setting TuneReportCheckpointCallback epoch number to {}".format(latest_saved_checkpoint_number))
tune_report_checkpoint_callback._checkpoint._counter = Counter()
tune_report_checkpoint_callback._checkpoint._counter["epoch_end"] = latest_saved_checkpoint_number
tune_report_checkpoint_callback._checkpoint._cp_count = latest_saved_checkpoint_number
callbacks.append(tune_report_checkpoint_callback)

callbacks.append(
TuneReportCheckpointCallback(
metrics=[
"adam_beta_1",
"charge_loss",
"cls_acc_unweighted",
"cls_loss",
"cos_phi_loss",
"energy_loss",
"eta_loss",
"learning_rate",
"loss",
"pt_loss",
"sin_phi_loss",
"val_charge_loss",
"val_cls_acc_unweighted",
"val_cls_acc_weighted",
"val_cls_loss",
"val_cos_phi_loss",
"val_energy_loss",
"val_eta_loss",
"val_loss",
"val_pt_loss",
"val_sin_phi_loss",
],
),
try:
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=full_config["setup"]["num_epochs"],
callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
initial_epoch=initial_epoch,
)

try:
model.fit(
ds_train.repeat(),
validation_data=ds_test.repeat(),
epochs=full_config["setup"]["num_epochs"],
callbacks=callbacks,
steps_per_epoch=num_train_steps,
validation_steps=num_test_steps,
)
except tf.errors.ResourceExhaustedError:
logging.warning("Resource exhausted, skipping this hyperparameter configuration.")
skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt"
lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()]

with open(skiplog_file_path, "a") as f:
f.write("#" * 80 + "\n")
for line in lines:
f.write(line)
logging.warning(line[:-1])
f.write("#" * 80 + "\n\n")
except tf.errors.ResourceExhaustedError:
logging.warning("Resource exhausted, skipping this hyperparameter configuration.")
skiplog_file_path = Path(full_config["raytune"]["local_dir"]) / name / "skipped_configurations.txt"
lines = ["{}: {}\n".format(item[0], item[1]) for item in config.items()]

with open(skiplog_file_path, "a") as f:
f.write("#" * 80 + "\n")
for line in lines:
f.write(line)
logging.warning(line[:-1])
f.write("#" * 80 + "\n\n")


@main.command()
Expand Down
Loading