Skip to content

Commit

Permalink
Add inference command (#187)
Browse files Browse the repository at this point in the history
* fix: error in raytune search space

* enable best practice settings for optimal model performance

* only max out NVIDIA L2 cache if GPUs are found

* enable benchmarking callback for clic dataset schema

* feat: configure tensorboard profiling from config file

* Update eval script on flatiron

* Update training batch script on flatiron

* Update raytune batch script on flatiron

* Add batch scripts for 8 GPU training on flatiron

* Update raytune search space file

* Setting numpy==1.23.5 in requirements.txt
Later versions are incompatible with tf2onnx 1.14.0 (latest at time of commit)

* Add inference command in TF pipeline script

* Additional flatiron batch scripts

Former-commit-id: 23fa559
  • Loading branch information
erwulff authored Jul 28, 2023
1 parent dff8454 commit cec8890
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 12 deletions.
62 changes: 62 additions & 0 deletions mlpf/flatiron/pipeline_infer.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/sh

# Walltime limit
#SBATCH -t 10:00:00
#SBATCH -N 1
#SBATCH --exclusive
#SBATCH --tasks-per-node=1

#SBATCH --mem=1000G
#SBATCH --cpus-per-task=112
#SBATCH -p eval
#SBATCH --constraint=sapphire
#SBATCH -w worker6302

# #SBATCH -p gpu
# #SBATCH --constraint=v100
# #SBATCH --gpus-per-task=1
# #SBATCH -w workergpu094

# Job name
#SBATCH -J pipeinfer

# Output and error logs
#SBATCH -o logs_slurm/log_%x_%j.out
#SBATCH -e logs_slurm/log_%x_%j.err

# Add jobscript to job output
echo "#################### Job submission script. #############################"
cat $0
echo "################# End of job submission script. #########################"

module --force purge; module load modules/2.1.1-20230405
module load slurm gcc cmake nccl cuda/11.8.0 cudnn/8.4.0.27-11.6 openmpi/4.0.7

nvidia-smi

source ~/miniconda3/bin/activate tf2
which python3
python3 --version

# make tensorflow find cupti (needed for profiling)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mnt/sw/nix/store/3xpm36w2kcri3j1m5j15hg025my1p4kx-cuda-11.8.0/extras/CUPTI/lib64/

train_dir="/mnt/ceph/users/ewulff/particleflow/experiments/bsm10_1GPU_clic-gnn-tuned-v130_20230724_035617_375578.workergpu037"

# export CUDA_VISIBLE_DEVICES=0

## declare an array variable
declare -a bs=(1024 512 256 128 64 32 16 8 4 2 1)

## now loop through the above array
for i in "${bs[@]}"
do
echo 'Starting inference.'
python3 mlpf/pipeline.py infer \
--train-dir $train_dir \
--nevents 4000 \
--bs "$i" \
--num-runs 11 \
-o /mnt/ceph/users/ewulff/particleflow/inference_tests/results_$SLURMD_NODENAME.json
echo 'Inference done.'
done
42 changes: 42 additions & 0 deletions mlpf/flatiron/pipeline_train_1GPU.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/sh

# Walltime limit
#SBATCH -t 7-00:00:00
#SBATCH -N 1
#SBATCH --tasks-per-node=1
#SBATCH -p gpu
#SBATCH --gpus-per-task=1
#SBATCH --constraint=a100-80gb,ib
#SBATCH --mem=200G

# Job name
#SBATCH -J pipetrain

# Output and error logs
#SBATCH -o logs_slurm/log_%x_%j.out
#SBATCH -e logs_slurm/log_%x_%j.err

# Add jobscript to job output
echo "#################### Job submission script. #############################"
cat $0
echo "################# End of job submission script. #########################"


module --force purge; module load modules/2.1.1-20230405
module load slurm gcc cmake nccl cuda/11.8.0 cudnn/8.4.0.27-11.6 openmpi/4.0.7

source ~/miniconda3/bin/activate tf2
which python3
python3 --version

# make tensorflow find cupti (needed for profiling)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mnt/sw/nix/store/3xpm36w2kcri3j1m5j15hg025my1p4kx-cuda-11.8.0/extras/CUPTI/lib64/

export TF_GPU_THREAD_MODE=gpu_private
export TF_GPU_THREAD_COUNT=2
nvidia-smi

echo 'Starting training.'
CUDA_VISIBLE_DEVICES=0 python3 mlpf/pipeline.py train -c $1 -p $2 \
--seeds --comet-exp-name particleflow-tf-clic
echo 'Training done.'
84 changes: 84 additions & 0 deletions mlpf/flatiron/raytune_1GPUperTrial.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/bash

#SBATCH -t 168:00:00
#SBATCH -N 8
#SBATCH --tasks-per-node=1
#SBATCH -p gpu
#SBATCH --constraint=a100,ib
#SBATCH --gpus-per-task=4
#SBATCH --cpus-per-task=64

# Job name
#SBATCH -J raytune

# Output and error logs
#SBATCH -o logs_slurm/log_%x_%j.out
#SBATCH -e logs_slurm/log_%x_%j.err

# Add jobscript to job output
echo "#################### Job submission script. #############################"
cat $0
echo "################# End of job submission script. #########################"

set -x

export TUNE_RESULT_DIR="/mnt/ceph/users/ewulff/ray_results/tune_result_dir"
export TUNE_MAX_PENDING_TRIALS_PG=$((SLURM_NNODES*2))

module --force purge; module load modules/2.0-20220630
module load slurm gcc cmake/3.22.3 nccl cuda/11.4.4 cudnn/8.2.4.15-11.4 openmpi/4.0.7
nvidia-smi

export CUDA_VISIBLE_DEVICES=0,1,2,3
num_gpus=4

source ~/miniconda3/bin/activate tf2
echo "Python used:"
which python3
python3 --version


################# DON NOT CHANGE THINGS HERE UNLESS YOU KNOW WHAT YOU ARE DOING ###############
# This script is a modification to the implementation suggest by gregSchwartz18 here:
# https://github.com/ray-project/ray/issues/826#issuecomment-522116599
redis_password=$(uuidgen)
export redis_password
echo "Redis password: ${redis_password}"

nodes=$(scontrol show hostnames $SLURM_JOB_NODELIST) # Getting the node names
nodes_array=( $nodes )

node_1=${nodes_array[0]}
ip=$(srun --nodes=1 --ntasks=1 -w $node_1 hostname --ip-address) # making redis-address
port=6379
ip_head=$ip:$port
export ip_head
echo "IP Head: $ip_head"

echo "STARTING HEAD at $node_1"
srun --nodes=1 --ntasks=1 -w $node_1 \
ray start --head --node-ip-address="$node_1" --port=$port \
--num-cpus $((SLURM_CPUS_PER_TASK)) --num-gpus $num_gpus --block & # mlpf/raytune/start-head.sh $ip $port &

sleep 10

worker_num=$(($SLURM_JOB_NUM_NODES - 1)) #number of nodes other than the head node
for (( i=1; i<=$worker_num; i++ ))
do
node_i=${nodes_array[$i]}
echo "STARTING WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w $node_i \
ray start --address "$node_1":"$port" \
--num-cpus $((SLURM_CPUS_PER_TASK)) --num-gpus $num_gpus --block & # mlpf/raytune/start-worker.sh $ip_head &
sleep 5
done

echo All Ray workers started.
##############################################################################################

#### call your code below
python3 mlpf/pipeline.py raytune -c $1 -n $2 --cpus $((SLURM_CPUS_PER_TASK/4)) \
--gpus 1 --seeds --comet-exp-name particleflow-raytune
# --ntrain 100 --ntest 100 #--comet-online

exit
164 changes: 163 additions & 1 deletion mlpf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
summarize_top_k,
topk_summary_plot_v2,
)
from tfmodel.callbacks import NpEncoder


@click.group()
Expand Down Expand Up @@ -397,7 +398,168 @@ def evaluate(config, train_dir, weights, customize, nevents):
Path(eval_dir).mkdir(parents=True, exist_ok=True)
eval_model(model, ds_test_tfds, config, eval_dir)

freeze_model(model, config, train_dir)
freeze_model(model, config, train_dir) # export to ONNX


@main.command()
@click.help_option("-h", "--help")
@click.option(
"--train-dir",
default=None,
help="directory containing a completed training",
type=click.Path(),
)
@click.option("--config", help="configuration file", type=click.Path())
@click.option(
"--weights",
default=None,
help="trained weights to load",
type=click.Path(),
)
@click.option("--bs", help="batch size to use for inference", type=int, default=1)
@click.option("--customize", help="customization function", type=str, default=None)
@click.option("--nevents", help="maximum number of events", type=int, default=-1)
@click.option("--verbose", help="verbose output", type=int, default=0)
@click.option("--num-runs", help="how many times to run the inference", type=int, default=2)
@click.option("-o", "--output", help="write summary of results to file", type=Path)
@click.option("--cpus", help="CPU threads", type=int, default=None)
def infer(config, train_dir, weights, bs, customize, nevents, verbose, num_runs, output, cpus):
import json

strategy, num_gpus, num_batches_multiplier = get_strategy(num_cpus=cpus) # sets TF ENV variables to use num_cpus
assert num_gpus < 2, "Multi-GPU inference is not supported"

if output:
assert num_runs > 1, "If writing summary results to file, num_runs must be >1"

if train_dir is None:
assert (config is not None) and (
weights is not None
), "Please provide a config and weight file when not giving train_dir"

if config is None:
config = Path(train_dir) / "config.yaml"
assert config.exists(), "Could not find config file in train_dir, please provide one with -c <path/to/config>"
config, _ = parse_config(config, weights=weights)

if customize:
config = customization_functions[customize](config)

# disable small graph optimization for onnx export (tf.cond is not well supported by ONNX export)
if "small_graph_opt" in config["setup"]:
config["setup"]["small_graph_opt"] = False

if not weights:
weights = get_best_checkpoint(train_dir)
logging.info("Loading best weights that could be found from {}".format(weights))

model, _, initial_epoch = model_scope(config, 1, weights=weights)

print("before loading")
print("model.normalizer.mean:", model.normalizer.mean)
print("model.normalizer.variance:", model.normalizer.variance)

cache = np.load(config["setup"]["normalizer_cache"] + ".npz")
model.normalizer.mean = tf.convert_to_tensor(cache["mean"])
model.normalizer.variance = tf.convert_to_tensor(cache["variance"])
print("after loading")
print("model.normalizer.mean:", model.normalizer.mean)
print("model.normalizer.variance:", model.normalizer.variance)

num_events = nevents if nevents >= 0 else config["validation_num_events"]
ds_val = mlpf_dataset_from_config(
config["validation_dataset"],
config,
"test",
num_events,
)
tfds_dataset = ds_val.tensorflow_dataset.padded_batch(bs)

times = []
predict_workers = bs # 1 worker per sample in the batch
# TODO: don't hardcode maximum allowed workers
if predict_workers > 112:
predict_workers = 112 # ensure workers is not more than available cpu threads
for i in range(num_runs):
# Using model.predict(tf_dataset) doesn't work because pfnetdense is not hashable due to the use of slicing in
# __call__. Hence, we have to loop through the dataset.
print("\nRun {}/{}".format(i + 1, num_runs))
print("####### Inference using model.predict(sample) ############")
start_time = tf.timestamp().numpy()
for elem in tqdm.tqdm(tfds_dataset, desc="Model inference"):
_ = model.predict(
elem["X"],
verbose=verbose,
workers=predict_workers,
use_multiprocessing=(predict_workers > 1),
)
stop_time = tf.timestamp().numpy()
total_time = stop_time - start_time
times.append(total_time)
print("Total number of events used: {:d}".format(num_events))
print("Batch size: {:d}".format(bs))
print("Total inference time: {:.2f}s".format(total_time))
print("##########################################################")

if num_runs > 1:
# Summarizing results

# event throughput [1/s]
# - ignore batch padding
throughput_per_run = num_events / np.array(times)

# mean throughput
# - ignore first epoch (lazy graph construction)
mean_throughput = round(np.mean(throughput_per_run[1:]), 4)
print("mean_throughput:", mean_throughput)

# mean epoch time
# - ignore first epoch (lazy graph construction)
mean_run_time = round(np.mean(times[1:]), 4)
# batch_size_total = bs * (num_gpus or num_cpus)
print("mean_run_time:", mean_run_time)

data = {
"results": [
{
"wl-scores": {
"mean_throughput": mean_throughput,
"mean_run_time": mean_run_time,
},
"wl-stats": {
"num_runs": len(times),
"run_times": np.round(times, 4),
"total_inference_time": round(sum(times[1:]), 4),
"GPU": num_gpus,
"CPU": cpus or -1,
# "train_set_size": self.train_set_size,
# "batch_size_per_device": self.batch_size_per_gpu,
# "batch_size_total": batch_size_total,
"batch_size": bs,
"steps_per_run": num_events // bs,
"events_per_run": num_events,
"throughput_per_run": list(np.round(throughput_per_run, 4)),
},
}
],
}

if output:
result_path = output.resolve()
result_path.parent.mkdir(parents=True, exist_ok=True)

if result_path.is_file():
with result_path.open("r", encoding="utf-8") as f:
old_data = json.load(f)
else:
old_data = None

with result_path.open("w", encoding="utf-8") as f:
if old_data:
data = {"results": old_data["results"] + data["results"]}
json.dump(data, f, ensure_ascii=False, indent=4, cls=NpEncoder)
f.write("\n")
print("Saved result to {}".format(result_path))


@main.command()
Expand Down
Loading

0 comments on commit cec8890

Please sign in to comment.