Skip to content

Commit

Permalink
fix num_cpus flag (#150)
Browse files Browse the repository at this point in the history
* fix the num_cpus flag to actually change the number of cpus

* fix eval, clic postprocessing
  • Loading branch information
jpata authored Nov 21, 2022
1 parent 139dd61 commit 7a46466
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 43 deletions.
3 changes: 3 additions & 0 deletions mlpf/data_clic/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def prepare_data_clic(fn):
df_cl = data[iev]["clusters"]
df_tr = data[iev]["tracks"]
df_pfs = data[iev]["pfs"]
print("Clusters={}, tracks={}, PFs={}, Gen={}".format(len(df_cl), len(df_tr), len(df_pfs), len(df_gen)))
if len(df_pfs) < 10:
continue

# compute pt, px,py,pz
df_tr["pt"] = track_pt(df_tr["omega"])
Expand Down
57 changes: 30 additions & 27 deletions mlpf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,10 @@ def train(
config = customization_functions[customize](config)

# Decide tf.distribute.strategy depending on number of available GPUs
horovod_enabled = horovod_enabled or config["setup"]["horovod_enabled"]

if horovod_enabled:
pass
else:
horovod_enabled = config["setup"]["horovod_enabled"]
if horovod_enabled:
num_gpus = initialize_horovod()
num_gpus, num_batches_multiplier = initialize_horovod()
elif habana:
import habana_frameworks.tensorflow as htf

Expand All @@ -160,9 +158,9 @@ def train(
logging.info("Using habana_frameworks.tensorflow.distribute.HPUStrategy")
strategy = HPUStrategy()
num_gpus = 1
num_batches_multiplier = 1
else:
strategy, num_gpus = get_strategy(num_cpus=num_cpus)
devices = num_gpus or num_cpus # devices = num_cpus if num_gpus is 0 or None
strategy, num_gpus, num_batches_multiplier = get_strategy()

outdir = ""
if not horovod_enabled or hvd.rank() == 0:
Expand Down Expand Up @@ -206,12 +204,15 @@ def train(
with open(f"{outdir}/{jobid}.txt", "w") as f:
f.write(f"{jobid}\n")

ds_train, num_train_steps, train_samples = get_datasets(config["train_test_datasets"], config, devices, "train")
ds_test, num_test_steps, test_samples = get_datasets(config["train_test_datasets"], config, devices, "test")
ds_train, num_train_steps, train_samples = get_datasets(
config["train_test_datasets"], config, num_batches_multiplier, "train"
)
ds_test, num_test_steps, test_samples = get_datasets(
config["train_test_datasets"], config, num_batches_multiplier, "test"
)
ds_val, ds_info = get_heptfds_dataset(
config["validation_datasets"][0],
config,
devices,
"test",
config["setup"]["num_events_validation"],
supervised=False,
Expand Down Expand Up @@ -393,7 +394,11 @@ def initialize_horovod():
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU")

return hvd.size()
num_batches_multiplier = 1
if hvd.size() > 1:
num_batches_multiplier = hvd.size()

return hvd.size(), num_batches_multiplier


@main.command()
Expand All @@ -415,8 +420,8 @@ def compute_validation_loss(config, train_dir, weights):
else:
model_dtype = tf.dtypes.float32

strategy, num_gpus = get_strategy()
ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "test")
strategy, num_gpus, num_batches_multiplier = get_strategy()
ds_test, num_test_steps = get_datasets(config["train_test_datasets"], config, num_batches_multiplier, "test")

with strategy.scope():
model = make_model(config, model_dtype)
Expand Down Expand Up @@ -471,7 +476,7 @@ def evaluate(config, train_dir, weights, customize, nevents):
else:
model_dtype = tf.dtypes.float32

strategy, num_gpus = get_strategy()
strategy, num_gpus, num_batches_multiplier = get_strategy()

# disable small graph optimization for onnx export (tf.cond is not well supported)
if "small_graph_opt" in config["setup"]:
Expand All @@ -492,7 +497,7 @@ def evaluate(config, train_dir, weights, customize, nevents):
iepoch = int(weights.split("/")[-1].split("-")[1])

for dsname in config["validation_datasets"]:
ds_test, _ = get_heptfds_dataset(dsname, config, num_gpus, "test", supervised=False)
ds_test, _ = get_heptfds_dataset(dsname, config, "test", supervised=False)
if nevents:
ds_test = ds_test.take(nevents)
ds_test = ds_test.padded_batch(config["validation_batch_size"])
Expand All @@ -515,9 +520,9 @@ def find_lr(config, outdir, figname, logscale):
config, _ = parse_config(config)

# Decide tf.distribute.strategy depending on number of available GPUs
strategy, num_gpus = get_strategy()
strategy, num_gpus, num_batches_multiplier = get_strategy()

ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_gpus, "train")
ds_train, num_train_steps = get_datasets(config["train_test_datasets"], config, num_batches_multiplier, "train")

with strategy.scope():
opt = tf.keras.optimizers.Adam(learning_rate=1e-7) # This learning rate will be changed by the lr_finder
Expand Down Expand Up @@ -584,16 +589,13 @@ def hypertune(config, outdir, ntrain, ntest, recreate):
if config["hypertune"]["algorithm"] == "hyperband":
config["setup"]["num_epochs"] = config["hypertune"]["hyperband"]["max_epochs"]

strategy, num_gpus = get_strategy()
strategy, num_gpus, num_batches_multiplier = get_strategy()

ds_train, ds_info = get_heptfds_dataset(
config["training_dataset"], config, num_gpus, "train", config["setup"]["num_events_train"]
)
ds_test, _ = get_heptfds_dataset(config["testing_dataset"], config, num_gpus, "test", config["setup"]["num_events_test"])
ds_train, ds_info = get_heptfds_dataset(config["training_dataset"], config, "train", config["setup"]["num_events_train"])
ds_test, _ = get_heptfds_dataset(config["testing_dataset"], config, "test", config["setup"]["num_events_test"])
ds_val, _ = get_heptfds_dataset(
config["validation_datasets"][0],
config,
num_gpus,
"test",
config["setup"]["num_events_validation"],
supervised=False,
Expand Down Expand Up @@ -666,14 +668,15 @@ def build_model_and_train(config, checkpoint_dir=None, full_config=None, ntrain=
offline_directory=tune.get_trial_dir() + "/cometml",
)

strategy, num_gpus = get_strategy()
strategy, num_gpus, num_batches_multiplier = get_strategy()

ds_train, num_train_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "train")
ds_test, num_test_steps = get_datasets(full_config["train_test_datasets"], full_config, num_gpus, "test")
ds_train, num_train_steps = get_datasets(
full_config["train_test_datasets"], full_config, num_batches_multiplier, "train"
)
ds_test, num_test_steps = get_datasets(full_config["train_test_datasets"], full_config, num_batches_multiplier, "test")
ds_val, ds_info = get_heptfds_dataset(
full_config["validation_datasets"][0],
full_config,
num_gpus,
"test",
full_config["setup"]["num_events_validation"],
supervised=False,
Expand Down
46 changes: 32 additions & 14 deletions mlpf/tfmodel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def delete_all_but_best_checkpoint(train_dir, dry_run):
print("Removed all checkpoints in {} except {}".format(train_dir, best_ckpt))


def get_strategy(num_cpus=1):
def get_num_gpus_cuda():
if isinstance(os.environ.get("CUDA_VISIBLE_DEVICES"), type(None)) or len(os.environ.get("CUDA_VISIBLE_DEVICES")) == 0:
gpus = [-1]
print(
Expand All @@ -137,6 +137,22 @@ def get_strategy(num_cpus=1):
else:
num_gpus = len(gpus)
print("num_gpus:", num_gpus)
return num_gpus, gpus


def get_strategy(num_cpus=1):

# Always use the correct number of threads that were requested
if num_cpus == 1:
print("Warning: num_cpus==1, using explicitly only one CPU thread")

os.environ["OMP_NUM_THREADS"] = str(num_cpus)
os.environ["TF_NUM_INTRAOP_THREADS"] = str(num_cpus)
os.environ["TF_NUM_INTEROP_THREADS"] = str(num_cpus)
tf.config.threading.set_inter_op_parallelism_threads(num_cpus)
tf.config.threading.set_intra_op_parallelism_threads(num_cpus)

num_gpus, gpus = get_num_gpus_cuda()

if num_gpus > 1:
# multiple GPUs selected
Expand All @@ -146,15 +162,15 @@ def get_strategy(num_cpus=1):
# single GPU
print("Using a single GPU with tf.distribute.OneDeviceStrategy()")
strategy = tf.distribute.OneDeviceStrategy("gpu:{}".format(gpus[0]))
elif num_cpus > 1:
# CPU parallelization
print("Attempting CPU parallelization with tf.distribute.MirroredStrategy()...")
strategy = tf.distribute.MirroredStrategy()
else:
print("Fallback to CPU, using tf.distribute.OneDeviceStrategy('cpu')")
strategy = tf.distribute.OneDeviceStrategy("cpu")

return strategy, num_gpus
num_batches_multiplier = 1
if num_gpus > 1:
num_batches_multiplier = num_gpus

return strategy, num_gpus, num_batches_multiplier


def get_lr_schedule(config, steps):
Expand Down Expand Up @@ -287,7 +303,7 @@ def func(X, y, w):
return func


def get_heptfds_dataset(dataset_name, config, num_gpus, split, num_events=None, supervised=True):
def get_heptfds_dataset(dataset_name, config, split, num_events=None, supervised=True):
cds = config["dataset"]

if cds["schema"] == "cms":
Expand All @@ -308,12 +324,12 @@ def get_heptfds_dataset(dataset_name, config, num_gpus, split, num_events=None,
return ds, ds_info


def load_and_interleave(dataset_names, config, num_gpus, split, batch_size):
def load_and_interleave(dataset_names, config, num_batches_multiplier, split, batch_size):
datasets = []
steps = []
total_num_steps = 0
for ds_name in dataset_names:
ds, _ = get_heptfds_dataset(ds_name, config, num_gpus, split)
ds, _ = get_heptfds_dataset(ds_name, config, split)
num_steps = ds.cardinality().numpy()
total_num_steps += num_steps
assert num_steps > 0
Expand Down Expand Up @@ -348,15 +364,17 @@ def load_and_interleave(dataset_names, config, num_gpus, split, batch_size):
# the last bucket size is implicitly 'inf'
bucket_boundaries=[int(x[0]) for x in bucket_batch_sizes[:-1]],
# for multi-GPU, we need to multiply the batch size by the number of GPUs
bucket_batch_sizes=[int(x[1]) * num_gpus * config["batching"]["batch_multiplier"] for x in bucket_batch_sizes],
bucket_batch_sizes=[
int(x[1]) * num_batches_multiplier * config["batching"]["batch_multiplier"] for x in bucket_batch_sizes
],
drop_remainder=True,
)
# use fixed-size batching
else:
bs = batch_size
if not config["setup"]["horovod_enabled"]:
if num_gpus > 1:
bs = bs * num_gpus
if num_batches_multiplier > 1:
bs = bs * num_batches_multiplier
ds = ds.padded_batch(bs)

# now iterate over the full dataset to get the number of steps
Expand All @@ -369,7 +387,7 @@ def load_and_interleave(dataset_names, config, num_gpus, split, batch_size):


# Load multiple datasets and mix them together
def get_datasets(datasets_to_interleave, config, num_gpus, split):
def get_datasets(datasets_to_interleave, config, num_batches_to_load, split):
datasets = []
steps = []
num_samples = 0
Expand All @@ -379,7 +397,7 @@ def get_datasets(datasets_to_interleave, config, num_gpus, split):
logging.warning("No datasets in {} list.".format(joint_dataset_name))
else:
interleaved_ds, num_steps, ds_samples = load_and_interleave(
ds_conf["datasets"], config, num_gpus, split, ds_conf["batch_per_gpu"]
ds_conf["datasets"], config, num_batches_to_load, split, ds_conf["batch_per_gpu"]
)
print("Interleaved joint dataset {} with {} steps".format(joint_dataset_name, num_steps))
datasets.append(interleaved_ds)
Expand Down
6 changes: 4 additions & 2 deletions notebooks/clic.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@
},
"outputs": [],
"source": [
"path = \"../data/clic/gev380ee_pythia6_ttbar_rfull201\""
"path = \"../data/clic/gev380ee_pythia6_qcd_all_rfull201/\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "935316c1",
"metadata": {},
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"# Load the datasets, process to flattened (X,ygen,ycand) format\n",
Expand Down

0 comments on commit 7a46466

Please sign in to comment.