Skip to content

Commit

Permalink
Implement multi-gpu training in HPO with Ray Tune and Ray Train (jpat…
Browse files Browse the repository at this point in the history
…a#277)

* implement multi-gpu training in HPO with Ray Tune and Ray Train

* update flatiron scripts for HPO in pytorch

* update flatiron scripts for hpo on pytorch backend

* simplify ray hpo training function

* implement distributed training using Ray Train

* add support for HyperOpt and ASHA

* clean up commented code

* feat: distributed multi-node training using Ray Train

* fix CI test

* feat: enable comet ml logging when using Ray Train
  • Loading branch information
erwulff authored Nov 27, 2023
1 parent d221dfb commit a172b3a
Show file tree
Hide file tree
Showing 21 changed files with 1,307 additions and 423 deletions.
2 changes: 1 addition & 1 deletion mlpf/cuda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

def main():
args = parser.parse_args()
world_size = len(args.gpus.split(",")) # will be 1 for both cpu ("") and single-gpu ("0")
world_size = args.gpus # will be 1 for both cpu ("") and single-gpu ("0")

if args.gpus:
assert (
Expand Down
53 changes: 53 additions & 0 deletions mlpf/pyg/PFDataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from torch import Tensor
from torch_geometric.data import Batch, Data

from pyg.logger import _logger


class PFDataset:
"""Builds a DataSource from tensorflow datasets."""
Expand Down Expand Up @@ -164,3 +166,54 @@ def __len__(self):
len_ += len(self.data_loaders_iter[iloader])
self._len = len_
return len_


def get_interleaved_dataloaders(world_size, rank, config, use_cuda, pad_3d, use_ray):
loaders = {}
for split in ["train", "valid"]: # build train, valid dataset and dataloaders
loaders[split] = []
# build dataloader for physical and gun samples seperately
for type_ in config[f"{split}_dataset"][config["dataset"]]: # will be "physical", "gun", "multiparticlegun"
dataset = []
for sample in config[f"{split}_dataset"][config["dataset"]][type_]["samples"]:
version = config[f"{split}_dataset"][config["dataset"]][type_]["samples"][sample]["version"]

ds = PFDataset(config["data_dir"], f"{sample}:{version}", split, num_samples=config[f"n{split}"]).ds

if (rank == 0) or (rank == "cpu"):
_logger.info(f"{split}_dataset: {sample}, {len(ds)}", color="blue")

dataset.append(ds)
dataset = torch.utils.data.ConcatDataset(dataset)

# build dataloaders
batch_size = config[f"{split}_dataset"][config["dataset"]][type_]["batch_size"] * config["gpu_batch_multiplier"]

if world_size > 1:
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
else:
sampler = torch.utils.data.RandomSampler(dataset)

# build dataloaders
batch_size = config[f"{split}_dataset"][config["dataset"]][type_]["batch_size"] * config["gpu_batch_multiplier"]
loader = PFDataLoader(
dataset,
batch_size=batch_size,
collate_fn=Collater(["X", "ygen"], pad_3d=pad_3d),
sampler=sampler,
num_workers=config["num_workers"],
prefetch_factor=config["prefetch_factor"],
pin_memory=use_cuda,
pin_memory_device="cuda:{}".format(rank) if use_cuda else "",
)

if use_ray:
import ray

# prepare loader for distributed training, adds distributed sampler
loader = ray.train.torch.prepare_data_loader(loader)

loaders[split].append(loader)

loaders[split] = InterleavedIterator(loaders[split]) # will interleave maximum of three dataloaders
return loaders
Loading

0 comments on commit a172b3a

Please sign in to comment.