From 7a46466c42f0b0709f2d9fbd42712f0408ddb4d0 Mon Sep 17 00:00:00 2001 From: Joosep Pata Date: Mon, 21 Nov 2022 19:19:06 +0200 Subject: [PATCH] fix num_cpus flag (#150) * fix the num_cpus flag to actually change the number of cpus * fix eval, clic postprocessing --- mlpf/data_clic/postprocessing.py | 3 ++ mlpf/pipeline.py | 57 +++++++++++++++++--------------- mlpf/tfmodel/utils.py | 46 ++++++++++++++++++-------- notebooks/clic.ipynb | 6 ++-- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/mlpf/data_clic/postprocessing.py b/mlpf/data_clic/postprocessing.py index 0e61d4ea3..c6500ae76 100644 --- a/mlpf/data_clic/postprocessing.py +++ b/mlpf/data_clic/postprocessing.py @@ -168,6 +168,9 @@ def prepare_data_clic(fn): df_cl = data[iev]["clusters"] df_tr = data[iev]["tracks"] df_pfs = data[iev]["pfs"] + print("Clusters={}, tracks={}, PFs={}, Gen={}".format(len(df_cl), len(df_tr), len(df_pfs), len(df_gen))) + if len(df_pfs) < 10: + continue # compute pt, px,py,pz df_tr["pt"] = track_pt(df_tr["omega"]) diff --git a/mlpf/pipeline.py b/mlpf/pipeline.py index 45c1964b0..e5ae2e0e9 100644 --- a/mlpf/pipeline.py +++ b/mlpf/pipeline.py @@ -145,12 +145,10 @@ def train( config = customization_functions[customize](config) # Decide tf.distribute.strategy depending on number of available GPUs + horovod_enabled = horovod_enabled or config["setup"]["horovod_enabled"] + if horovod_enabled: - pass - else: - horovod_enabled = config["setup"]["horovod_enabled"] - if horovod_enabled: - num_gpus = initialize_horovod() + num_gpus, num_batches_multiplier = initialize_horovod() elif habana: import habana_frameworks.tensorflow as htf @@ -160,9 +158,9 @@ def train( logging.info("Using habana_frameworks.tensorflow.distribute.HPUStrategy") strategy = HPUStrategy() num_gpus = 1 + num_batches_multiplier = 1 else: - strategy, num_gpus = get_strategy(num_cpus=num_cpus) - devices = num_gpus or num_cpus # devices = num_cpus if num_gpus is 0 or None + strategy, num_gpus, num_batches_multiplier = get_strategy() outdir = "" if not horovod_enabled or hvd.rank() == 0: @@ -206,12 +204,15 @@ def train( with open(f"{outdir}/{jobid}.txt", "w") as f: f.write(f"{jobid}\n") - ds_train, num_train_steps, train_samples = get_datasets(config["train_test_datasets"], config, devices, "train") - ds_test, num_test_steps, test_samples = get_datasets(config["train_test_datasets"], config, devices, "test") + ds_train, num_train_steps, train_samples = get_datasets( + config["train_test_datasets"], config, num_batches_multiplier, "train" + ) + ds_test, num_test_steps, test_samples = get_datasets( + config["train_test_datasets"], config, num_batches_multiplier, "test" + ) ds_val, ds_info = get_heptfds_dataset( config["validation_datasets"][0], config, - devices, "test", config["setup"]["num_events_validation"], supervised=False, @@ -393,7 +394,11 @@ def initialize_horovod(): if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") - return hvd.size() + num_batches_multiplier = 1 + if hvd.size() > 1: + num_batches_multiplier = hvd.size() + + return hvd.size(), num_batches_multiplier @main.command() @@ -415,8 +420,8 @@ def compute_validation_loss(config, train_dir, weights): else: model_dtype = tf.dtypes.float32 - strategy, num_gpus = get_strategy() - ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "test") + strategy, num_gpus, num_batches_multiplier = get_strategy() + ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_batches_multiplier, "test") with strategy.scope(): model = make_model(config, model_dtype) @@ -471,7 +476,7 @@ def evaluate(config, train_dir, weights, customize, nevents): else: model_dtype = tf.dtypes.float32 - strategy, num_gpus = get_strategy() + strategy, num_gpus, num_batches_multiplier = get_strategy() # disable small graph optimization for onnx export (tf.cond is not well supported) if "small_graph_opt" in config["setup"]: @@ -492,7 +497,7 @@ def evaluate(config, train_dir, weights, customize, nevents): iepoch = int(weights.split("/")[-1].split("-")[1]) for dsname in config["validation_datasets"]: - ds_test, _ = get_heptfds_dataset(dsname, config, num_gpus, "test", supervised=False) + ds_test, _ = get_heptfds_dataset(dsname, config, "test", supervised=False) if nevents: ds_test = ds_test.take(nevents) ds_test = ds_test.padded_batch(config["validation_batch_size"]) @@ -515,9 +520,9 @@ def find_lr(config, outdir, figname, logscale): config, _ = parse_config(config) # Decide tf.distribute.strategy depending on number of available GPUs - strategy, num_gpus = get_strategy() + strategy, num_gpus, num_batches_multiplier = get_strategy() - ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "train") + ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_batches_multiplier, "train") with strategy.scope(): opt = tf.keras.optimizers.Adam(learning_rate=1e-7) # This learning rate will be changed by the lr_finder @@ -584,16 +589,13 @@ def hypertune(config, outdir, ntrain, ntest, recreate): if config["hypertune"]["algorithm"] == "hyperband": config["setup"]["num_epochs"] = config["hypertune"]["hyperband"]["max_epochs"] - strategy, num_gpus = get_strategy() + strategy, num_gpus, num_batches_multiplier = get_strategy() - ds_train, ds_info = get_heptfds_dataset( - config["training_dataset"], config, num_gpus, "train", config["setup"]["num_events_train"] - ) - ds_test, _ = get_heptfds_dataset(config["testing_dataset"], config, num_gpus, "test", config["setup"]["num_events_test"]) + ds_train, ds_info = get_heptfds_dataset(config["training_dataset"], config, "train", config["setup"]["num_events_train"]) + ds_test, _ = get_heptfds_dataset(config["testing_dataset"], config, "test", config["setup"]["num_events_test"]) ds_val, _ = get_heptfds_dataset( config["validation_datasets"][0], config, - num_gpus, "test", config["setup"]["num_events_validation"], supervised=False, @@ -666,14 +668,15 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain= offline_directory=tune.get_trial_dir() + "/cometml", ) - strategy, num_gpus = get_strategy() + strategy, num_gpus, num_batches_multiplier = get_strategy() - ds_train, num_train_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "train") - ds_test, num_test_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "test") + ds_train, num_train_steps = get_datasets( + full_config["train_test_datasets"], full_config, num_batches_multiplier, "train" + ) + ds_test, num_test_steps = get_datasets(full_config["train_test_datasets"], full_config, num_batches_multiplier, "test") ds_val, ds_info = get_heptfds_dataset( full_config["validation_datasets"][0], full_config, - num_gpus, "test", full_config["setup"]["num_events_validation"], supervised=False, diff --git a/mlpf/tfmodel/utils.py b/mlpf/tfmodel/utils.py index 0931ca136..1a571eafe 100644 --- a/mlpf/tfmodel/utils.py +++ b/mlpf/tfmodel/utils.py @@ -123,7 +123,7 @@ def delete_all_but_best_checkpoint(train_dir, dry_run): print("Removed all checkpoints in {} except {}".format(train_dir, best_ckpt)) -def get_strategy(num_cpus=1): +def get_num_gpus_cuda(): if isinstance(os.environ.get("CUDA_VISIBLE_DEVICES"), type(None)) or len(os.environ.get("CUDA_VISIBLE_DEVICES")) == 0: gpus = [-1] print( @@ -137,6 +137,22 @@ def get_strategy(num_cpus=1): else: num_gpus = len(gpus) print("num_gpus:", num_gpus) + return num_gpus, gpus + + +def get_strategy(num_cpus=1): + + # Always use the correct number of threads that were requested + if num_cpus == 1: + print("Warning: num_cpus==1, using explicitly only one CPU thread") + + os.environ["OMP_NUM_THREADS"] = str(num_cpus) + os.environ["TF_NUM_INTRAOP_THREADS"] = str(num_cpus) + os.environ["TF_NUM_INTEROP_THREADS"] = str(num_cpus) + tf.config.threading.set_inter_op_parallelism_threads(num_cpus) + tf.config.threading.set_intra_op_parallelism_threads(num_cpus) + + num_gpus, gpus = get_num_gpus_cuda() if num_gpus > 1: # multiple GPUs selected @@ -146,15 +162,15 @@ def get_strategy(num_cpus=1): # single GPU print("Using a single GPU with tf.distribute.OneDeviceStrategy()") strategy = tf.distribute.OneDeviceStrategy("gpu:{}".format(gpus[0])) - elif num_cpus > 1: - # CPU parallelization - print("Attempting CPU parallelization with tf.distribute.MirroredStrategy()...") - strategy = tf.distribute.MirroredStrategy() else: print("Fallback to CPU, using tf.distribute.OneDeviceStrategy('cpu')") strategy = tf.distribute.OneDeviceStrategy("cpu") - return strategy, num_gpus + num_batches_multiplier = 1 + if num_gpus > 1: + num_batches_multiplier = num_gpus + + return strategy, num_gpus, num_batches_multiplier def get_lr_schedule(config, steps): @@ -287,7 +303,7 @@ def func(X, y, w): return func -def get_heptfds_dataset(dataset_name, config, num_gpus, split, num_events=None, supervised=True): +def get_heptfds_dataset(dataset_name, config, split, num_events=None, supervised=True): cds = config["dataset"] if cds["schema"] == "cms": @@ -308,12 +324,12 @@ def get_heptfds_dataset(dataset_name, config, num_gpus, split, num_events=None, return ds, ds_info -def load_and_interleave(dataset_names, config, num_gpus, split, batch_size): +def load_and_interleave(dataset_names, config, num_batches_multiplier, split, batch_size): datasets = [] steps = [] total_num_steps = 0 for ds_name in dataset_names: - ds, _ = get_heptfds_dataset(ds_name, config, num_gpus, split) + ds, _ = get_heptfds_dataset(ds_name, config, split) num_steps = ds.cardinality().numpy() total_num_steps += num_steps assert num_steps > 0 @@ -348,15 +364,17 @@ def load_and_interleave(dataset_names, config, num_gpus, split, batch_size): # the last bucket size is implicitly 'inf' bucket_boundaries=[int(x[0]) for x in bucket_batch_sizes[:-1]], # for multi-GPU, we need to multiply the batch size by the number of GPUs - bucket_batch_sizes=[int(x[1]) * num_gpus * config["batching"]["batch_multiplier"] for x in bucket_batch_sizes], + bucket_batch_sizes=[ + int(x[1]) * num_batches_multiplier * config["batching"]["batch_multiplier"] for x in bucket_batch_sizes + ], drop_remainder=True, ) # use fixed-size batching else: bs = batch_size if not config["setup"]["horovod_enabled"]: - if num_gpus > 1: - bs = bs * num_gpus + if num_batches_multiplier > 1: + bs = bs * num_batches_multiplier ds = ds.padded_batch(bs) # now iterate over the full dataset to get the number of steps @@ -369,7 +387,7 @@ def load_and_interleave(dataset_names, config, num_gpus, split, batch_size): # Load multiple datasets and mix them together -def get_datasets(datasets_to_interleave, config, num_gpus, split): +def get_datasets(datasets_to_interleave, config, num_batches_to_load, split): datasets = [] steps = [] num_samples = 0 @@ -379,7 +397,7 @@ def get_datasets(datasets_to_interleave, config, num_gpus, split): logging.warning("No datasets in {} list.".format(joint_dataset_name)) else: interleaved_ds, num_steps, ds_samples = load_and_interleave( - ds_conf["datasets"], config, num_gpus, split, ds_conf["batch_per_gpu"] + ds_conf["datasets"], config, num_batches_to_load, split, ds_conf["batch_per_gpu"] ) print("Interleaved joint dataset {} with {} steps".format(joint_dataset_name, num_steps)) datasets.append(interleaved_ds) diff --git a/notebooks/clic.ipynb b/notebooks/clic.ipynb index 67f9f2bd5..a5bd33062 100644 --- a/notebooks/clic.ipynb +++ b/notebooks/clic.ipynb @@ -32,14 +32,16 @@ }, "outputs": [], "source": [ - "path = \"../data/clic/gev380ee_pythia6_ttbar_rfull201\"" + "path = \"../data/clic/gev380ee_pythia6_qcd_all_rfull201/\"" ] }, { "cell_type": "code", "execution_count": null, "id": "935316c1", - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# Load the datasets, process to flattened (X,ygen,ycand) format\n",