From 5cd9b99f7b0b035621612db604c3eb42a00c8474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sat, 13 Feb 2021 00:25:19 +0100 Subject: [PATCH 1/3] remove legacy accelerators --- .../accelerators/legacy/__init__.py | 25 - .../accelerators/legacy/accelerator.py | 264 ----------- .../legacy/accelerator_connector.py | 440 ------------------ .../accelerators/legacy/cpu_accelerator.py | 81 ---- .../accelerators/legacy/ddp2_accelerator.py | 266 ----------- .../accelerators/legacy/ddp_accelerator.py | 379 --------------- .../legacy/ddp_cpu_hpc_accelerator.py | 47 -- .../legacy/ddp_cpu_spawn_accelerator.py | 295 ------------ .../legacy/ddp_hpc_accelerator.py | 255 ---------- .../legacy/ddp_spawn_accelerator.py | 327 ------------- .../accelerators/legacy/dp_accelerator.py | 157 ------- .../accelerators/legacy/gpu_accelerator.py | 99 ---- .../legacy/horovod_accelerator.py | 198 -------- .../accelerators/legacy/tpu_accelerator.py | 345 -------------- 14 files changed, 3178 deletions(-) delete mode 100644 pytorch_lightning/accelerators/legacy/__init__.py delete mode 100644 pytorch_lightning/accelerators/legacy/accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/accelerator_connector.py delete mode 100644 pytorch_lightning/accelerators/legacy/cpu_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp2_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp_cpu_hpc_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp_cpu_spawn_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp_hpc_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/ddp_spawn_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/dp_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/gpu_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/horovod_accelerator.py delete mode 100644 pytorch_lightning/accelerators/legacy/tpu_accelerator.py diff --git a/pytorch_lightning/accelerators/legacy/__init__.py b/pytorch_lightning/accelerators/legacy/__init__.py deleted file mode 100644 index a97edb21e504d..0000000000000 --- a/pytorch_lightning/accelerators/legacy/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.cpu_accelerator import CPUAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp2_accelerator import DDP2Accelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp_accelerator import DDPAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp_cpu_hpc_accelerator import DDPCPUHPCAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp_cpu_spawn_accelerator import DDPCPUSpawnAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp_hpc_accelerator import DDPHPCAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.ddp_spawn_accelerator import DDPSpawnAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.dp_accelerator import DataParallelAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.gpu_accelerator import GPUAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.horovod_accelerator import HorovodAccelerator # noqa: F401 -from pytorch_lightning.accelerators.legacy.tpu_accelerator import TPUAccelerator # noqa: F401 diff --git a/pytorch_lightning/accelerators/legacy/accelerator.py b/pytorch_lightning/accelerators/legacy/accelerator.py deleted file mode 100644 index e6bd74c2c039a..0000000000000 --- a/pytorch_lightning/accelerators/legacy/accelerator.py +++ /dev/null @@ -1,264 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from contextlib import contextmanager -from typing import Any, Optional, Union - -import torch -from torch.optim import Optimizer - -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities.apply_func import move_data_to_device -from pytorch_lightning.utilities.parsing import AttributeDict - -if torch.distributed.is_available(): - from torch.distributed import ReduceOp -else: - class ReduceOp: - SUM = None - - -class Accelerator(object): - - def __init__(self, - trainer: Optional = None, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - self.trainer = trainer - self.nickname = None - self.cluster_environment = cluster_environment - self.dist = AttributeDict(rank=0, device=None) - self.ddp_plugin = ddp_plugin - - if trainer is not None: - self.train_loop = self.trainer.train - self.validation_loop = self.trainer.run_evaluation - self.test_loop = self.trainer.run_evaluation - - def setup(self, model): - pass - - def train(self): - self.trainer.setup_trainer(self.trainer.model) - return self.train_or_test() - - def teardown(self): - # Ensure if necessary all processes are finished - self.barrier() - - def barrier(self, name: Optional[str] = None): - pass - - def broadcast(self, obj, src=0): - return obj - - def train_or_test(self): - if self.trainer.testing: - results = self.trainer.run_test() - else: - self.trainer.train_loop.setup_training() - results = self.trainer.train() - return results - - def batch_to_device(self, batch: Any, device: torch.device): - model = self.trainer.get_model() - if model is not None: - return model.transfer_batch_to_device(batch, device) - return move_data_to_device(batch, device) - - def training_step_end(self, output): - return output - - def test_step_end(self, output): - return output - - def validation_step_end(self, output): - return output - - def process_dataloader(self, dataloader): - return dataloader - - def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs): - automatic_optimization = self.trainer.train_loop.automatic_optimization - - if not automatic_optimization and self.ddp_plugin is not None: - # Manually prepare for reduce as user calling backwards manually - self.ddp_plugin.on_before_manual_backward(self.trainer.model, closure_loss) - - if self.trainer.precision == 16: - closure_loss = self.trainer.precision_connector.backend.backward( - closure_loss, optimizer, opt_idx, *args, **kwargs - ) - else: - # do backward pass - model = self.trainer.get_model() - model.backward(closure_loss, optimizer, opt_idx, *args, **kwargs) - - # once backward has been applied, release graph - closure_loss = closure_loss.detach() - return closure_loss - - def clip_gradients(self, optimizer, clip_val=None): - # use the trainer's clip val if none passed - grad_clip_val = self.trainer.gradient_clip_val - if clip_val is not None: - grad_clip_val = clip_val - grad_clip_val = float(grad_clip_val) - - if grad_clip_val <= 0: - return - self._clip_gradients(optimizer, grad_clip_val) - - def _clip_gradients(self, optimizer: Optimizer, grad_clip_val: Union[float, int], norm_type: float = 2.0): - if self.trainer.amp_backend: - self.trainer.precision_connector.backend.clip_gradients(grad_clip_val, optimizer, norm_type) - else: - model = self.trainer.get_model() - torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=grad_clip_val, norm_type=norm_type) - - def on_train_epoch_end(self, outputs): - pass - - def on_train_end(self): - pass - - def early_stopping_should_stop(self, pl_module): - return self.trainer.should_stop - - def setup_optimizers(self, model): - if self.trainer.testing: - return - - optimizers, lr_schedulers, optimizer_frequencies = self.trainer.init_optimizers(model) - self.trainer.optimizers = optimizers - self.trainer.lr_schedulers = lr_schedulers - self.trainer.optimizer_frequencies = optimizer_frequencies - - def init_ddp_connection( - self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True - ) -> None: - self.ddp_plugin.init_ddp_connection( - self.trainer, - self.cluster_environment, - global_rank, - world_size, - is_slurm_managing_tasks, - ) - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - """ - Function to reduce a tensor from several distributed processes to one aggregated tensor. - - Args: - tensor: the tensor to sync and reduce - group: the process group to gather results from. Defaults to all processes (world) - reduce_op: the reduction operation. Defaults to sum. - Can also be a string of 'avg', 'mean' to calculate the mean during reduction. - - Return: - reduced value - """ - raise NotImplementedError() - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - raise NotImplementedError() - - def optimizer_state(self, optimizer: Optimizer) -> dict: - """ - Returns state of an optimizer. Allows for syncing/collating optimizer state from processes in custom - plugins. - - Return: - Optimizer state dict - """ - if self.ddp_plugin: - return self.ddp_plugin.optimizer_state(optimizer) - return optimizer.state_dict() - - def get_reference_model(self, model) -> LightningModule: - """ - Override to modify returning base :class:`LightningModule` - when accessing variable and functions if the accelerator has wrapped the model. - - Example:: - ref_model = accelerator.get_reference_model(model) - ref_model.training_step(...) - - Args: - model: Accelerator model. - - Returns: - Reference :class:`LightningModule`. - - """ - return model - - def __getstate__(self): - return { - 'trainer': self.trainer, - 'nickname': self.nickname, - 'cluster_environment': self.cluster_environment, - 'dist': self.dist, - 'ddp_plugin': self.ddp_plugin - } - - def __setstate__(self, d): - self.trainer = d['trainer'] - self.nickname = d['nickname'] - self.cluster_environment = d['cluster_environment'] - self.dist = d['dist'] - self.ddp_plugin = d['ddp_plugin'] - - def on_save(self, checkpoint): - return checkpoint - - @property - def rpc_enabled(self): - return self.ddp_plugin is not None and isinstance(self.ddp_plugin, RPCPlugin) - - @property - def distributed_sampler_kwargs(self): - raise NotImplementedError - - @property - def require_distributed_sampler(self): - raise NotImplementedError - - @contextmanager - def block_ddp_plugin_sync_behaviour(self): - """ - Blocks ddp sync gradients behaviour on backwards pass. - This is useful for skipping sync when accumulating gradients, reducing communication overhead - - Returns: - context manager with sync behaviour off - """ - cm = self.ddp_plugin.block_backward_sync(self.trainer.model) if self.ddp_plugin else None - yield cm diff --git a/pytorch_lightning/accelerators/legacy/accelerator_connector.py b/pytorch_lightning/accelerators/legacy/accelerator_connector.py deleted file mode 100644 index 7d012b870b03a..0000000000000 --- a/pytorch_lightning/accelerators/legacy/accelerator_connector.py +++ /dev/null @@ -1,440 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os - -import torch - -from pytorch_lightning import _logger as log -from pytorch_lightning import accelerators -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator -from pytorch_lightning.plugins.environments.slurm_environment import SLURMEnvironment -from pytorch_lightning.plugins.environments.torchelastic_environment import TorchElasticEnvironment -from pytorch_lightning.utilities import ( - _HOROVOD_AVAILABLE, - _TPU_AVAILABLE, - device_parser, - DeviceType, - DistributedType, - rank_zero_only, -) -from pytorch_lightning.utilities.distributed import rank_zero_info, rank_zero_warn -from pytorch_lightning.utilities.exceptions import MisconfigurationException - -if _HOROVOD_AVAILABLE: - import horovod.torch as hvd - - -class AcceleratorConnector: - - def __init__(self, trainer): - self.trainer = trainer - self.accelerator = None - - def on_trainer_init( - self, - num_processes, - tpu_cores, - accelerator, - distributed_backend, - auto_select_gpus, - gpus, - num_nodes, - log_gpu_memory, - sync_batchnorm, - benchmark, - replace_sampler_ddp, - deterministic, - ): - # temp until we remove all dist backend references - distributed_backend = self._map_deprecated_dist_backend(accelerator, distributed_backend) - - self.trainer.deterministic = deterministic - - torch.backends.cudnn.deterministic = self.trainer.deterministic - if self.trainer.deterministic: - # fixing non-deterministic part of horovod - # https://github.com/PyTorchLightning/pytorch-lightning/pull/1572/files#r420279383 - os.environ["HOROVOD_FUSION_THRESHOLD"] = str(0) - - # distributed backend choice - self.trainer.distributed_backend = distributed_backend.lower() if distributed_backend else None - - # init the default rank if exists - # we need to call this here or NVIDIA flags and other messaging in init will show on all ranks - # this way we only show it on rank 0 - if 'LOCAL_RANK' in os.environ: - rank_zero_only.rank = int(os.environ['LOCAL_RANK']) - - # benchmarking - self.trainer.benchmark = benchmark - torch.backends.cudnn.benchmark = self.trainer.benchmark - - # Transfer params - self.trainer.num_nodes = num_nodes - self.trainer.log_gpu_memory = log_gpu_memory - - # sync-bn backend - self.trainer.sync_batchnorm = sync_batchnorm - - self._parse_tpu_device_details(tpu_cores) - - if num_processes != 1 and distributed_backend != "ddp_cpu": - rank_zero_warn("num_processes is only used for `accelerator='ddp_cpu'`. Ignoring it.") - self.trainer.num_processes = num_processes - - # override with environment flag - gpus = os.environ.get('PL_TRAINER_GPUS', gpus) - self.trainer.gpus = gpus - - # for gpus allow int, string and gpu list - if auto_select_gpus and isinstance(gpus, int): - self.trainer.gpus = self.trainer.tuner.pick_multiple_gpus(gpus) - - self.trainer.data_parallel_device_ids = device_parser.parse_gpu_ids(self.trainer.gpus) - self.trainer.root_gpu = device_parser.determine_root_gpu_device(self.trainer.data_parallel_device_ids) - - # distributed backend choice - self.set_distributed_mode() - - # init flags for SLURM+DDP to work - self.trainer.world_size = 1 - self.trainer.interactive_ddp_procs = [] - - # link up SLURM - # TODO: this should be taken out of here... but depends too much on DDP - self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - self.trainer.node_rank = self.determine_ddp_node_rank() - self.trainer.local_rank = self.determine_local_rank() - self.trainer.global_rank = 0 - - # NVIDIA setup - self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids) - - self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') - - self.trainer.replace_sampler_ddp = replace_sampler_ddp - - def _parse_tpu_device_details(self, tpu_cores): - self.trainer.tpu_cores = device_parser.parse_tpu_cores(tpu_cores) - if self.trainer.tpu_cores is not None: - if _TPU_AVAILABLE: - self.trainer._device_type = DeviceType.TPU - self.trainer.distributed_backend = "tpu" - else: - raise MisconfigurationException( - f"You have requested {self.trainer.tpu_cores} TPU cores but none is available." - ) - - self.trainer.tpu_id = self.trainer.tpu_cores[0] if isinstance(self.trainer.tpu_cores, list) else None - - # tpu state flags - self.trainer.tpu_local_core_rank = None - self.trainer.tpu_global_core_rank = None - - def _map_deprecated_dist_backend(self, accelerator, distributed_backend): - if distributed_backend is not None: - rank_zero_warn( - '`distributed_backend` has been renamed to accelerator. Deprecated in 1.0.0, will be removed in 1.2.0', - DeprecationWarning - ) - - # temporary mapping until we remove all the distributed_backend references - if accelerator is not None: - self.accelerator = accelerator - if isinstance(accelerator, Accelerator): - self.accelerator.trainer = self - distributed_backend = self.accelerator.nickname - else: - distributed_backend = accelerator - return distributed_backend - - def _select_environment(self): - if self.trainer.plugin_connector.cloud_environment: - env = self.trainer.plugin_connector.cloud_environment - elif self.trainer.is_slurm_managing_tasks: - env = SLURMEnvironment() - elif self._is_using_torchelastic(): - env = TorchElasticEnvironment() - else: - env = TorchElasticEnvironment() - return env - - def _is_using_torchelastic(self): - te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - return te_flags_passed - - def select_accelerator(self): - if self.trainer.accelerator_backend is not None: - return self.trainer.accelerator_backend - - # ---------------------------------- - # Use the user provided accelerator - # ---------------------------------- - # use the one the user passed in - if self.accelerator is not None and isinstance(self.accelerator, Accelerator): - self.accelerator.trainer = self.trainer - self.accelerator.ddp_plugin = self.trainer.plugin_connector.ddp_plugin - acc = self.accelerator - return acc - - # ---------------------------------- - # choose an accelerator for the user - # ---------------------------------- - use_slurm_ddp = ( - self.trainer._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN) - and self.trainer.is_slurm_managing_tasks - ) - - # torchelastic or general non_slurm ddp - te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - use_torchelastic_ddp = ( - self.trainer._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN) and te_flags_passed - ) - - use_ddp_cpu_spawn = ( - self.trainer._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN) - and self.trainer._device_type == DeviceType.CPU - ) - - use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks - - # ddp script mode uses the same flags as TE - # TODO: decouple from TE - if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - use_torchelastic_ddp = False - - cluster_env = self._select_environment() - - # TODO: clean-up this branching as most just select class and uses the very same arguments - # choose the appropriate accelerator backend - if self.trainer._distrib_type == DistributedType.DDP2: - accelerator_backend = accelerators.DDP2Accelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_slurm: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_slurm_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_torch_elastic: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_torchelastic_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer._distrib_type == DistributedType.DDP_SPAWN: - accelerator_backend = accelerators.DDPSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_spawn: - accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp": - accelerator_backend = accelerators.DDPAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer._distrib_type == DistributedType.DP: - accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) - - elif self.trainer._distrib_type == DistributedType.HOROVOD: - accelerator_backend = accelerators.HorovodAccelerator(self.trainer, cluster_env) - - elif self.trainer._device_type == DeviceType.GPU and self.trainer.num_gpus == 1: - accelerator_backend = accelerators.GPUAccelerator(self.trainer, cluster_env) - - elif self.trainer._device_type == DeviceType.TPU: - accelerator_backend = accelerators.TPUAccelerator(self.trainer, cluster_env) - - elif self.trainer.distributed_backend is None: - accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) - else: - raise MisconfigurationException( - f'`Trainer(accelerator={self.trainer.distributed_backend}, num_nodes={self.trainer.num_nodes},' - f' num_processes={self.trainer.num_processes}, ...)` is not a supported backend for' - f' num_gpus={self.trainer.num_gpus}' - ) - - return accelerator_backend - - def set_distributed_mode(self): - - if self.trainer.distributed_backend is None: - if self.has_horovodrun(): - self._set_horovod_backend() - elif self.trainer.num_gpus == 0 and (self.trainer.num_nodes > 1 or self.trainer.num_processes > 1): - self.trainer._distrib_type = DistributedType.DDP - elif self.trainer.num_gpus > 1: - rank_zero_warn( - 'You requested multiple GPUs but did not specify a backend, e.g.' - ' `Trainer(accelerator="dp"|"ddp"|"ddp2")`. Setting `accelerator="ddp_spawn"` for you.' - ) - self.trainer.distributed_backend = "ddp_spawn" - - # special case with DDP on CPUs - if self.trainer.distributed_backend == "ddp_cpu": - self.trainer._distrib_type = DistributedType.DDP - self.trainer.data_parallel_device_ids = None - if self.trainer.num_gpus > 0: - rank_zero_warn( - 'You requested one or more GPUs, but set the backend to `ddp_cpu`. Training will not use GPUs.' - ) - if self.trainer.num_processes is None: - # define the max CPU available - self.trainer.num_processes = os.cpu_count() - # special case with TPUs - elif self.trainer.distributed_backend == 'tpu': - self.trainer._device_type = DeviceType.TPU - # set all other requested distrib. types adn if it was not set in the - elif self.trainer.distributed_backend and self.trainer._distrib_type is None: - self.trainer._distrib_type = DistributedType(self.trainer.distributed_backend) - - # unless you request explicitly for CPU and some GPU are available use them - _on_cpu = self.trainer.distributed_backend and 'cpu' in self.trainer.distributed_backend - if (self.trainer.num_gpus > 0 and not _on_cpu): - self.trainer._device_type = DeviceType.GPU - - _distrib_types = (DistributedType.DP, DistributedType.DDP, DistributedType.DDP_SPAWN, DistributedType.DDP2) - # DP and DDP2 cannot run without GPU - if (self.trainer.num_gpus == 0 and self.trainer._distrib_type in _distrib_types): - rank_zero_warn( - 'You requested distributed training on GPUs, but none is available, so we set backend to `ddp_cpu`.' - ) - # todo: in some cases it yield in comarison None and int - if ((self.trainer.num_nodes and self.trainer.num_nodes > 1) - or (self.trainer.num_processes and self.trainer.num_processes > 1)): - self.trainer._distrib_type = DistributedType.DDP - else: - rank_zero_warn('You are running on single node with no parallelization, so distributed has no effect.') - self.trainer._distrib_type = None - - # for DDP overwrite nb processes by requested GPUs - if (self.trainer._device_type == DeviceType.GPU - and self.trainer._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN)): - self.trainer.num_processes = self.trainer.num_gpus - - # Horovod si an extra case... - if self.trainer.distributed_backend == "horovod": - self._set_horovod_backend() - - # throw error to force user ddp or ddp2 choice - _ddp = (DistributedType.DDP, DistributedType.DDP_SPAWN, DistributedType.DDP2) - if (self.trainer.num_nodes > 1 and self.trainer._distrib_type not in _ddp): - raise MisconfigurationException( - 'DataParallel does not support num_nodes > 1. ' - 'To avoid this exception, set `accelerator="ddp"` or `accelerator="ddp2"`' - ) - - rank_zero_info( - f'GPU available: {torch.cuda.is_available()}, used: {self.trainer._device_type == DeviceType.GPU}' - ) - num_cores = self.trainer.tpu_cores if self.trainer.tpu_cores is not None else 0 - rank_zero_info(f'TPU available: {_TPU_AVAILABLE}, using: {num_cores} TPU cores') - - if torch.cuda.is_available() and self.trainer._device_type != DeviceType.GPU: - rank_zero_warn('GPU available but not used. Set the --gpus flag when calling the script.') - - def _set_horovod_backend(self): - self._check_horovod() - self.trainer._distrib_type = DistributedType.HOROVOD - - # Initialize Horovod to get rank / size info - hvd.init() - if self.trainer._device_type == DeviceType.GPU: - # Horovod assigns one local GPU per process - self.trainer.root_gpu = hvd.local_rank() - - def _check_horovod(self): - """Raises a `MisconfigurationException` if the Trainer is not configured correctly for Horovod.""" - if not _HOROVOD_AVAILABLE: - raise MisconfigurationException( - 'Requested `accelerator="horovod"`, but Horovod is not installed.' - 'Install with \n $HOROVOD_WITH_PYTORCH=1 pip install horovod[pytorch]' - ) - - if self.trainer.num_gpus > 1 or self.trainer.num_nodes > 1: - raise MisconfigurationException( - 'Horovod does not support setting num_nodes / num_gpus explicitly. Use ' - 'horovodrun / mpirun to configure the number of processes.' - ) - - @staticmethod - def has_horovodrun(): - """Returns True if running with `horovodrun` using Gloo or OpenMPI.""" - return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ - - def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): - # Todo: required argument `is_slurm_managing_tasks` is not used - if data_parallel_device_ids is None: - return - - # set the correct cuda visible devices (using pci order) - os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) - devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') - - def determine_local_rank(self): - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_LOCALID']) - return int(os.environ.get('LOCAL_RANK', 0)) - - def determine_ddp_node_rank(self): - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_NODEID']) - - # torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK. - # otherwise use given node rank or default to node rank 0 - env_vars = ['NODE_RANK', 'GROUP_RANK'] - node_ids = [(k, os.environ.get(k, None)) for k in env_vars] - node_ids = [(k, v) for k, v in node_ids if v is not None] - if len(node_ids) == 0: - return 0 - if len(node_ids) > 1: - log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.") - k, rank = node_ids.pop() - rank_zero_info(f"Using environment variable {k} for node rank ({rank}).") - return int(rank) diff --git a/pytorch_lightning/accelerators/legacy/cpu_accelerator.py b/pytorch_lightning/accelerators/legacy/cpu_accelerator.py deleted file mode 100644 index 323b2720ebfc7..0000000000000 --- a/pytorch_lightning/accelerators/legacy/cpu_accelerator.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Any, Callable, Optional, Union - -import torch - -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.exceptions import MisconfigurationException - - -class CPUAccelerator(Accelerator): - - def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): - """ - Runs training on CPU - - Example:: - - # default - trainer = Trainer(accelerator=CPUAccelerator()) - - """ - super().__init__(trainer, cluster_environment) - self.nickname = None - - def setup(self, model): - # run through amp wrapper - if self.trainer.amp_backend: - raise MisconfigurationException('amp + cpu is not supported. Please use a GPU option') - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - self.trainer.model = model - - def _step(self, model_step: Callable, args): - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = model_step(*args) - else: - output = model_step(*args) - return output - - def training_step(self, args): - return self._step(self.trainer.model.training_step, args) - - def validation_step(self, args): - return self._step(self.trainer.model.validation_step, args) - - def test_step(self, args): - return self._step(self.trainer.model.test_step, args) - - def predict(self, args): - return self._step(self.trainer.model.predict, args) - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return tensor - - @property - def require_distributed_sampler(self): - return False diff --git a/pytorch_lightning/accelerators/legacy/ddp2_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp2_accelerator.py deleted file mode 100644 index 601eb116054ec..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp2_accelerator.py +++ /dev/null @@ -1,266 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -from typing import Any, List, Optional, Union - -import torch -import torch.distributed as torch_distrib -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.core.step_result import Result -from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available - - -class DDP2Accelerator(Accelerator): - - def __init__(self, - trainer, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP2 strategy on a cluster - - Example:: - - # default - trainer = Trainer(accelerator=DDP2Accelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.task_idx = None - self.dist = LightningDistributed() - self.nickname = 'ddp2' - - def setup(self, model): - self.trainer.model = model - self.task_idx = self.cluster_environment.local_rank() - - def train(self): - model = self.trainer.model - return self.ddp_train(process_idx=self.task_idx, mp_queue=None, model=model) - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def barrier(self, name: Optional[str] = None): - if torch_distrib.is_initialized(): - torch_distrib.barrier() - - def training_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - return output - - def validation_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - return output - - def test_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - return output - - def set_world_ranks(self, process_idx): - # Todo: required argument `process_idx` is not used - self.trainer.local_rank = self.trainer.node_rank - self.trainer.global_rank = self.trainer.node_rank - self.trainer.world_size = self.trainer.num_nodes - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj) - - def init_device(self, process_idx): - self.trainer.root_gpu = process_idx - torch.cuda.set_device(self.trainer.root_gpu) - - def model_to_device(self, model): - model.cuda(self.trainer.root_gpu) - - def get_device_ids(self): - device_ids = self.trainer.data_parallel_device_ids - return device_ids - - def ddp_train(self, process_idx, mp_queue, model): - """ - Entry point for ddp - - Args: - process_idx: current process rank - mp_queue: multiprocessing queue - model: pointer to current :class:`LightningModule` - - Returns: - Dict with evaluation results - - """ - # Todo: required argument `mp_queue` is not used - # show progressbar only on progress_rank 0 - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # determine which process we are and world size - self.set_world_ranks(process_idx) - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # Initialize cuda device - self.init_device(process_idx) - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - if isinstance(self.ddp_plugin, RPCPlugin): - if not self.ddp_plugin.is_main_rpc_process: - self.ddp_plugin.on_accelerator_exit_rpc_process(self.trainer) - self.ddp_plugin.exit_rpc_process() - if self.ddp_plugin.return_after_exit_rpc_process: - return - else: - self.ddp_plugin.on_main_rpc_connection(self.trainer) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend}') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - self.ddp_plugin.on_after_setup_optimizers(self.trainer) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # device ids change depending on the DDP setup - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # clean up memory - torch.cuda.empty_cache() - return results - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - self.ddp_plugin.device_ids = device_ids - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return sync_ddp_if_available(tensor, group, reduce_op) - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - return all_gather_ddp_if_available(tensor, group=group, sync_grads=sync_grads) - - def get_reference_model(self, model) -> LightningModule: - return self.ddp_plugin.get_model_from_plugin(model) - - @property - def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=self.trainer.num_nodes, - rank=self.trainer.global_rank - ) - if self.ddp_plugin is not None: - distributed_sampler_kwargs = self.ddp_plugin.distributed_sampler_kwargs(distributed_sampler_kwargs) - return distributed_sampler_kwargs - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/ddp_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp_accelerator.py deleted file mode 100644 index 613cef3f1bfec..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp_accelerator.py +++ /dev/null @@ -1,379 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import os -import subprocess -import sys -from os.path import abspath -from time import sleep -from typing import Any, List, Optional, Union - -import numpy as np -import torch -import torch.distributed as torch_distrib -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import _HYDRA_AVAILABLE, AMPType -from pytorch_lightning.utilities.distributed import ( - all_gather_ddp_if_available, - find_free_network_port, - rank_zero_only, - sync_ddp_if_available, -) -from pytorch_lightning.utilities.exceptions import MisconfigurationException -from pytorch_lightning.utilities.seed import seed_everything - -if _HYDRA_AVAILABLE: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd, to_absolute_path - - -class DDPAccelerator(Accelerator): - - def __init__(self, - trainer: Optional = None, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP strategy on a single machine (manually, not via cluster start) - - Example:: - - # default - trainer = Trainer(accelerator=DDPAccelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.task_idx = None - self._has_spawned_children = False - self.interactive_ddp_procs = [] - self.dist = LightningDistributed() - self.nickname = 'ddp' - - def setup(self, model): - # first track model - self.trainer.model = model - - # start the other scripts - if os.environ.get('PL_IN_DDP_SUBPROCESS', '0') != '1': - self._call_children_scripts() - - # set the task idx - self.task_idx = int(os.environ['LOCAL_RANK']) - - def _call_children_scripts(self): - assert self.trainer.global_rank == 0 - self._check_can_spawn_children() - self._has_spawned_children = True - - os.environ['MASTER_ADDR'] = os.environ.get('MASTER_ADDR', '127.0.0.1') - os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) - - # allow the user to pass the node rank - node_rank = '0' - node_rank = os.environ.get('NODE_RANK', node_rank) - node_rank = os.environ.get('GROUP_RANK', node_rank) - os.environ['NODE_RANK'] = node_rank - os.environ['LOCAL_RANK'] = '0' - - # when user is using hydra find the absolute path - path_lib = abspath if not _HYDRA_AVAILABLE else to_absolute_path - - # pull out the commands used to run the script and resolve the abs file path - command = sys.argv - try: - full_path = path_lib(command[0]) - # todo: specify the possible exception - except Exception: - full_path = abspath(command[0]) - - command[0] = full_path - # use the same python interpreter and actually running - command = [sys.executable] + command - - # the visible devices tell us how many GPUs we want to use. - # when the trainer script was called the device has already been scoped by the time - # code reaches this point. so, to call the scripts, we need to leave cuda visible devices alone - # but forward the GPUs selected via environment variables - if self.trainer.data_parallel_device_ids is None: - raise MisconfigurationException('you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)') - - os.environ['PL_TRAINER_GPUS'] = ','.join([str(i) for i in self.trainer.data_parallel_device_ids]) - os.environ['PL_IN_DDP_SUBPROCESS'] = '1' - - if self.trainer.logger is not None: - os.environ['PL_EXP_VERSION'] = str(self.trainer.logger.version) - - num_gpus = len(self.trainer.data_parallel_device_ids) - os.environ['WORLD_SIZE'] = f'{num_gpus * self.trainer.num_nodes}' - - self.interactive_ddp_procs = [] - for local_rank in range(1, self.trainer.num_processes): - env_copy = os.environ.copy() - env_copy['LOCAL_RANK'] = f'{local_rank}' - - # remove env var if global seed not set - if os.environ.get('PL_GLOBAL_SEED') is None and 'PL_GLOBAL_SEED' in env_copy: - del env_copy['PL_GLOBAL_SEED'] - - # start process - # if hydra is available and initialized, make sure to set the original cwd correctly - # and pass current cwd for ddp processes (which hydra has overridden) - cwd: Optional[str] = None - if _HYDRA_AVAILABLE: - if HydraConfig.initialized(): - cwd = get_original_cwd() - command += [ - f'hydra.run.dir={os.getcwd()}', - f'hydra.job.name=train_ddp_process_{local_rank}' - ] - proc = subprocess.Popen(command, env=env_copy, cwd=cwd) - self.interactive_ddp_procs.append(proc) - - # starting all processes at once can cause issues - # with dataloaders delay between 1-10 seconds - delay = np.random.uniform(1, 5, 1)[0] - sleep(delay) - - def train(self): - model = self.trainer.model - - results = self.ddp_train(process_idx=self.task_idx, model=model) - if 'WORLD_SIZE' in os.environ: - del os.environ['WORLD_SIZE'] - return results - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def barrier(self, name: Optional[str] = None): - if self.rpc_enabled: - # Allow RPC to handle barrier on main RPC processes - self.ddp_plugin.barrier() - elif torch_distrib.is_initialized(): - torch_distrib.barrier(group=self.ddp_plugin.data_parallel_group) - - def _check_can_spawn_children(self): - if self._has_spawned_children: - raise RuntimeError( - "You tried to run `.fit` or `.test` multiple times in the same script." - " This is not supported in DDP mode, switch to `accelerator='ddp_spawn'` instead." - ) - - def set_world_ranks(self, process_idx): - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - - def init_device(self, process_idx): - # Todo: required argument `process_idx` is not used - self.trainer.root_gpu = self.trainer.data_parallel_device_ids[self.trainer.local_rank] - torch.cuda.set_device(self.trainer.root_gpu) - - def model_to_device(self, model): - model.cuda(self.trainer.root_gpu) - - def get_device_ids(self): - device_ids = [self.trainer.root_gpu] - return device_ids - - def on_train_end(self): - pass - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device) - torch_distrib.all_reduce(stop, op=torch_distrib.reduce_op.SUM) - self.barrier('early_stopping') - should_stop = stop == self.trainer.world_size - return should_stop - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj, group=self.ddp_plugin.data_parallel_group) - - def ddp_train(self, process_idx, model): - """ - Entry point for ddp - - Args: - process_idx: - model: - - Returns: - Dict with evaluation results - - """ - seed = os.environ.get("PL_GLOBAL_SEED") - if seed is not None: - seed_everything(int(seed)) - - # show progressbar only on progress_rank 0 - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # determine which process we are and world size - self.set_world_ranks(process_idx) - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # Initialize cuda device - self.init_device(process_idx) - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - if isinstance(self.ddp_plugin, RPCPlugin): - if not self.ddp_plugin.is_main_rpc_process: - self.ddp_plugin.on_accelerator_exit_rpc_process(self.trainer) - self.ddp_plugin.exit_rpc_process() - if self.ddp_plugin.return_after_exit_rpc_process: - return - else: - self.ddp_plugin.on_main_rpc_connection(self.trainer) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend}') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # device ids change depending on the DDP setup - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - self.barrier('ddp_setup') - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # clean up memory - torch.cuda.empty_cache() - - return results - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - self.ddp_plugin.device_ids = device_ids - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - """ - - """ - return sync_ddp_if_available(tensor, group, reduce_op) - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - return all_gather_ddp_if_available(tensor, group=group, sync_grads=sync_grads) - - def get_reference_model(self, model) -> LightningModule: - return self.ddp_plugin.get_model_from_plugin(model) - - @property - def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=self.trainer.num_nodes * self.trainer.num_processes, - rank=self.trainer.global_rank - ) - if self.ddp_plugin is not None: - distributed_sampler_kwargs = self.ddp_plugin.distributed_sampler_kwargs(distributed_sampler_kwargs) - return distributed_sampler_kwargs - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp_cpu_hpc_accelerator.py deleted file mode 100644 index 320de215bb2ae..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp_cpu_hpc_accelerator.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -from typing import Optional - -from pytorch_lightning.accelerators.legacy.ddp_hpc_accelerator import DDPHPCAccelerator -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin - - -class DDPCPUHPCAccelerator(DDPHPCAccelerator): - - def __init__(self, - trainer, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP (with CPUs) strategy on a cluster - - Example:: - - # default - trainer = Trainer(accelerator=DDPCPUHPCAccelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.nickname = 'ddp_cpu' - - def model_to_device(self, model): - model.cpu() - - def get_device_ids(self): - device_ids = None - return device_ids - - def init_device(self, process_idx): - pass diff --git a/pytorch_lightning/accelerators/legacy/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp_cpu_spawn_accelerator.py deleted file mode 100644 index 8375c7590c312..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp_cpu_spawn_accelerator.py +++ /dev/null @@ -1,295 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import os -from typing import Any, List, Optional, Union - -import torch -import torch.distributed as torch_distrib -import torch.multiprocessing as mp -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import ( - all_gather_ddp_if_available, - find_free_network_port, - rank_zero_only, - rank_zero_warn, - sync_ddp_if_available, -) - - -class DDPCPUSpawnAccelerator(Accelerator): - - def __init__(self, - trainer, - nprocs: int, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP (on a single machine or manually on multiple machines), using mp.spawn - - Example:: - - # default - trainer = Trainer(accelerator=DDPCPUSpawnAccelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.mp_queue = None - self.nprocs = nprocs - self.dist = LightningDistributed() - self.nickname = 'ddp_cpu' - - def setup(self, model): - os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) - - # pass in a state q - smp = mp.get_context('spawn') - self.mp_queue = smp.SimpleQueue() - - self.trainer.model = model - - def train(self): - model = self.trainer.model - - # train in children process - mp.spawn(self.ddp_train, nprocs=self.nprocs, args=(self.mp_queue, model,)) - - # restore main state with best weights - best_path = self.mp_queue.get() - results = self.mp_queue.get() - - # recover the weights of the processes trained in the children - self.__recover_child_process_weights(model, best_path) - return results - - def ddp_train(self, process_idx, mp_queue, model): - """ - Entry point for ddp - - Args: - process_idx: - mp_queue: multiprocessing queue - model: - """ - # show progressbar only on progress_rank 0 - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # determine which process we are and world size - self.set_world_ranks(process_idx) - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - if isinstance(self.ddp_plugin, RPCPlugin): - if not self.ddp_plugin.is_main_rpc_process: - self.ddp_plugin.on_accelerator_exit_rpc_process(self.trainer) - self.ddp_plugin.exit_rpc_process() - if self.ddp_plugin.return_after_exit_rpc_process: - return - else: - self.ddp_plugin.on_main_rpc_connection(self.trainer) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend}') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model, process_idx) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - self.ddp_plugin.on_after_setup_optimizers(self.trainer) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # DDP spawn already spawned off each process... no need to do anything - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # get original model - model = self.trainer.get_model() - - # persist info in ddp_spawn - self.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) - - # clean up memory - torch.cuda.empty_cache() - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def barrier(self, name: Optional[str] = None): - if torch_distrib.is_initialized(): - torch_distrib.barrier() - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj) - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device) - torch_distrib.all_reduce(stop, op=torch_distrib.reduce_op.SUM) - torch_distrib.barrier() - should_stop = stop == self.trainer.world_size - return should_stop - - def set_world_ranks(self, process_idx): - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - - def model_to_device(self, model, process_idx): - # Todo: required argument `process_idx` is not used - model.cpu() - - def get_device_ids(self): - device_ids = None - return device_ids - - def __recover_child_process_weights(self, model, best_path): - # transfer back the best path to the trainer - if self.trainer.checkpoint_callback: - self.trainer.checkpoint_callback.best_model_path = best_path - - self.trainer.model = model - - def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): - # Todo: required argument `model` is not used - # track the best model path - best_model_path = None - if self.trainer.checkpoint_callback is not None: - best_model_path = self.trainer.checkpoint_callback.best_model_path - - if self.trainer.global_rank == 0 and mp_queue is not None: - rank_zero_warn('cleaning up ddp environment...') - # todo, pass complete checkpoint as state dictionary - mp_queue.put(best_model_path) - mp_queue.put(results) - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - self.ddp_plugin.device_ids = device_ids - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return sync_ddp_if_available(tensor, group, reduce_op) - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - return all_gather_ddp_if_available(tensor, group=group, sync_grads=sync_grads) - - def get_reference_model(self, model) -> LightningModule: - return self.ddp_plugin.get_model_from_plugin(model) - - @property - def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=self.trainer.num_nodes * self.trainer.num_processes, - rank=self.trainer.global_rank - ) - if self.ddp_plugin is not None: - distributed_sampler_kwargs = self.ddp_plugin.distributed_sampler_kwargs(distributed_sampler_kwargs) - return distributed_sampler_kwargs - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp_hpc_accelerator.py deleted file mode 100644 index 113405baa0d5c..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp_hpc_accelerator.py +++ /dev/null @@ -1,255 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -from typing import Any, List, Optional, Union - -import torch -import torch.distributed as dist -import torch.distributed as torch_distrib -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available - - -class DDPHPCAccelerator(Accelerator): - - def __init__(self, - trainer, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP on an HPC cluster - - Example:: - - # default - trainer = Trainer(accelerator=DDPHPCAccelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.task_idx = None - self._has_spawned_children = False - self.dist = LightningDistributed() - self.nickname = 'ddp' - - def setup(self, model): - self.trainer.model = model - self.task_idx = self.cluster_environment.local_rank() - - def train(self): - model = self.trainer.model - self.ddp_train(process_idx=self.task_idx, model=model) - - def set_world_ranks(self, process_idx): - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - - def init_device(self, process_idx): - self.trainer.root_gpu = process_idx - torch.cuda.set_device(self.trainer.root_gpu) - - def model_to_device(self, model): - model.cuda(self.trainer.root_gpu) - - def get_device_ids(self): - device_ids = [self.trainer.root_gpu] - return device_ids - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def barrier(self, name: Optional[str] = None): - if torch_distrib.is_initialized(): - torch_distrib.barrier() - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device) - dist.all_reduce(stop, op=dist.reduce_op.SUM) - dist.barrier() - should_stop = stop == self.trainer.world_size - return should_stop - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj) - - def ddp_train(self, process_idx, model): - """ - Entry point for ddp - - Args: - process_idx: - model: - - Returns: - Dict with evaluation results - - """ - # determine which process we are and world size - self.set_world_ranks(process_idx) - self.init_device(process_idx) - - # toggle prog bar - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - if isinstance(self.ddp_plugin, RPCPlugin): - if not self.ddp_plugin.is_main_rpc_process: - self.ddp_plugin.on_accelerator_exit_rpc_process(self.trainer) - self.ddp_plugin.exit_rpc_process() - if self.ddp_plugin.return_after_exit_rpc_process: - return - else: - self.ddp_plugin.on_main_rpc_connection(self.trainer) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend}') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - self.ddp_plugin.on_after_setup_optimizers(self.trainer) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # device ids change depending on the DDP setup - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # clean up memory - torch.cuda.empty_cache() - - return results - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - self.ddp_plugin.device_ids = device_ids - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return sync_ddp_if_available(tensor, group, reduce_op) - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - return all_gather_ddp_if_available(tensor, group=group, sync_grads=sync_grads) - - def get_reference_model(self, model) -> LightningModule: - return self.ddp_plugin.get_model_from_plugin(model) - - @property - def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=self.trainer.num_nodes * self.trainer.num_processes, - rank=self.trainer.global_rank - ) - if self.ddp_plugin is not None: - distributed_sampler_kwargs = self.ddp_plugin.distributed_sampler_kwargs(distributed_sampler_kwargs) - return distributed_sampler_kwargs - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/legacy/ddp_spawn_accelerator.py deleted file mode 100644 index fba425d2672eb..0000000000000 --- a/pytorch_lightning/accelerators/legacy/ddp_spawn_accelerator.py +++ /dev/null @@ -1,327 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import os -import re -from typing import Any, List, Optional, Union - -import torch -import torch.distributed as torch_distrib -import torch.multiprocessing as mp -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.distributed import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.plugins.legacy.ddp_plugin import DDPPlugin -from pytorch_lightning.plugins.legacy.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.cloud_io import atomic_save -from pytorch_lightning.utilities.cloud_io import load as pl_load -from pytorch_lightning.utilities.distributed import ( - all_gather_ddp_if_available, - find_free_network_port, - rank_zero_only, - rank_zero_warn, - sync_ddp_if_available, -) -from pytorch_lightning.utilities.seed import seed_everything - - -class DDPSpawnAccelerator(Accelerator): - - def __init__(self, - trainer, - nprocs: int, - cluster_environment: Optional[ClusterEnvironment] = None, - ddp_plugin: Optional[DDPPlugin] = None): - """ - Runs training using DDP using mp.spawn via manual launch (not cluster launch) - - Example:: - - # default - trainer = Trainer(accelerator=DDPSpawnAccelerator()) - - """ - super().__init__(trainer, cluster_environment, ddp_plugin) - self.mp_queue = None - self.nprocs = nprocs - self.dist = LightningDistributed() - self.nickname = 'ddp' - - def setup(self, model): - os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) - - # pass in a state q - smp = mp.get_context('spawn') - self.mp_queue = smp.SimpleQueue() - - self.trainer.model = model - - def train(self): - model = self.trainer.model - - # train in children process - mp.spawn(self.ddp_train, nprocs=self.nprocs, args=(self.mp_queue, model,)) - - # restore main state with best weights - best_path = self.mp_queue.get() - results = self.mp_queue.get() - last_path = self.mp_queue.get() - - # recover the weights of the processes trained in the children - self.__recover_child_process_weights(model, best_path, last_path) - return results - - def ddp_train(self, process_idx, mp_queue, model, is_master: bool = False, proc_offset: int = 0): - """ - Entry point for ddp - - Args: - process_idx: - mp_queue: multiprocessing queue - model: - """ - seed = os.environ.get("PL_GLOBAL_SEED") - if seed is not None: - seed_everything(int(seed)) - - # offset the process id if requested - process_idx = process_idx + proc_offset - - # show progressbar only on progress_rank 0 - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # determine which process we are and world size - self.set_world_ranks(process_idx) - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # Initialize cuda device - self.init_device(process_idx, is_master) - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - if isinstance(self.ddp_plugin, RPCPlugin): - if not self.ddp_plugin.is_main_rpc_process: - self.ddp_plugin.on_accelerator_exit_rpc_process(self.trainer) - self.ddp_plugin.exit_rpc_process() - if self.ddp_plugin.return_after_exit_rpc_process: - return - else: - self.ddp_plugin.on_main_rpc_connection(self.trainer) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend}') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - self.ddp_plugin.on_after_setup_optimizers(self.trainer) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # device ids change depending on the DDP setup - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # get original model - model = self.trainer.get_model() - - # persist info in ddp_spawn - self.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) - - # clean up memory - torch.cuda.empty_cache() - - def set_world_ranks(self, process_idx): - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - - def init_device(self, process_idx, is_master): - # Todo: required argument `process_idx` is not used - # Todo: required argument `is_master` is not used - gpu_idx = self.trainer.data_parallel_device_ids[self.trainer.local_rank] - self.trainer.root_gpu = gpu_idx - torch.cuda.set_device(self.trainer.root_gpu) - - def model_to_device(self, model): - model.cuda(self.trainer.root_gpu) - - def get_device_ids(self): - device_ids = [self.trainer.root_gpu] - return device_ids - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def barrier(self, name: Optional[str] = None): - if torch_distrib.is_initialized(): - torch_distrib.barrier() - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device) - torch_distrib.all_reduce(stop, op=torch_distrib.reduce_op.SUM) - torch_distrib.barrier() - should_stop = stop == self.trainer.world_size - return should_stop - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj) - - def __recover_child_process_weights(self, model, best_path, last_path): - # transfer back the best path to the trainer - if self.trainer.checkpoint_callback: - self.trainer.checkpoint_callback.best_model_path = best_path - # todo, pass also best score - - # load last weights - if last_path is not None and not self.trainer.testing: - ckpt = pl_load(last_path, map_location=lambda storage, loc: storage) - model.load_state_dict(ckpt) - - self.trainer.model = model - - def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): - best_model_path = None - if self.trainer.checkpoint_callback is not None: - best_model_path = self.trainer.checkpoint_callback.best_model_path - - if self.trainer.global_rank == 0 and mp_queue is not None: - rank_zero_warn('cleaning up ddp environment...') - # todo, pass complete checkpoint as state dictionary - mp_queue.put(best_model_path) - mp_queue.put(results) - - # save the last weights - last_path = None - if not self.trainer.testing and best_model_path is not None and len(best_model_path) > 0: - last_path = re.sub('.ckpt', '.tmp_end.ckpt', best_model_path) - atomic_save(model.state_dict(), last_path) - mp_queue.put(last_path) - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - self.ddp_plugin.device_ids = device_ids - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return sync_ddp_if_available(tensor, group, reduce_op) - - def all_gather(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, sync_grads: bool = False): - """ - Function to gather a tensor from several distributed processes - - Args: - tensor: tensor of shape (batch, ...) - group: the process group to gather results from. Defaults to all processes (world) - sync_grads: flag that allows users to synchronize gradients for all_gather op - - Return: - A tensor of shape (world_size, batch, ...) - """ - return all_gather_ddp_if_available(tensor, group=group, sync_grads=sync_grads) - - def get_reference_model(self, model) -> LightningModule: - return self.ddp_plugin.get_model_from_plugin(model) - - @property - def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=self.trainer.num_nodes * self.trainer.num_processes, - rank=self.trainer.global_rank - ) - if self.ddp_plugin is not None: - distributed_sampler_kwargs = self.ddp_plugin.distributed_sampler_kwargs(distributed_sampler_kwargs) - return distributed_sampler_kwargs - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/dp_accelerator.py b/pytorch_lightning/accelerators/legacy/dp_accelerator.py deleted file mode 100644 index e5b5c1d6bd6d5..0000000000000 --- a/pytorch_lightning/accelerators/legacy/dp_accelerator.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Optional - -import torch - -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.core.step_result import Result -from pytorch_lightning.distributed import LightningDistributed -from pytorch_lightning.overrides.data_parallel import LightningParallelModule -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.exceptions import MisconfigurationException - - -class DataParallelAccelerator(Accelerator): - - def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): - """ - Runs training using DP via manual start (not HPC cluster) - - Example:: - - # default - trainer = Trainer(accelerator=DataParallelAccelerator()) - - """ - super().__init__(trainer, cluster_environment) - self.model_autocast_original_forward = None - self.dist = LightningDistributed() - self.nickname = 'dp' - - def setup(self, model): - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # put model on correct device - model.cuda(self.trainer.root_gpu) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # init torch data parallel - model = self.__init_torch_data_parallel(model) - - # hack forward to do autocast for the user - self.model_autocast_original_forward = model.forward - - # init half precision - if self.trainer.amp_backend: - model = self.__init_half_precision(model) - - self.trainer.model = model - - def __init_torch_data_parallel(self, model): - # create list of device ids - device_ids = self.trainer.data_parallel_device_ids - if isinstance(device_ids, int): - device_ids = list(range(device_ids)) - - # set dp device - torch.cuda.set_device(self.trainer.root_gpu) - model = torch.nn.DataParallel(LightningParallelModule(model), device_ids=device_ids) - return model - - def __init_half_precision(self, model): - if self.trainer.amp_backend == AMPType.NATIVE: - self.__init_native_amp(model) - else: - model = self.__init_nvidia_apex(model) - return model - - def __init_native_amp(self, model): - model.forward = torch.cuda.amp.autocast()(model.forward) - - def __init_nvidia_apex(self, model): - # check for this bug (amp + dp + !01 doesn't work) - # https://github.com/NVIDIA/apex/issues/227 - if self.trainer.amp_level == 'O2': - raise MisconfigurationException( - f'Amp level {self.trainer.amp_level} with DataParallel is not supported.' - f' See this note from NVIDIA for more info: https://github.com/NVIDIA/apex/issues/227.' - f' We recommend you switch to ddp if you want to use amp') - else: - model = self.trainer.precision_connector.connect(model) - - return model - - def teardown(self): - # replace the original fwd function - self.trainer.model.forward = self.model_autocast_original_forward - self.barrier() - - def _step(self, args): - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def training_step(self, args): - return self._step(args) - - def validation_step(self, args): - return self._step(args) - - def test_step(self, args): - return self._step(args) - - def predict(self, args): - return self._step(args) - - def training_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - elif isinstance(output, torch.Tensor): - output = output.mean() - return output - - def validation_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - elif isinstance(output, torch.Tensor): - output = output.mean() - return output - - def test_step_end(self, output): - if isinstance(output, Result): - output.dp_reduce() - elif isinstance(output, torch.Tensor): - output = output.mean() - return output - - def get_reference_model(self, model) -> LightningModule: - if isinstance(model, torch.nn.DataParallel): - model = model.module - if isinstance(model, LightningParallelModule): - model = model.module - return model - - @property - def require_distributed_sampler(self): - return False diff --git a/pytorch_lightning/accelerators/legacy/gpu_accelerator.py b/pytorch_lightning/accelerators/legacy/gpu_accelerator.py deleted file mode 100644 index fecebd1d82bb2..0000000000000 --- a/pytorch_lightning/accelerators/legacy/gpu_accelerator.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Any, Callable, Optional, Union - -import torch - -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.utilities import AMPType - - -class GPUAccelerator(Accelerator): - amp_backend: AMPType - - def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): - """ - Runs training using a single GPU - - Example:: - - # default - trainer = Trainer(accelerator=GPUAccelerator()) - - """ - super().__init__(trainer, cluster_environment) - self.dist = LightningDistributed() - self.nickname = None - - def setup(self, model): - - # call setup - self.trainer.call_setup_hook(model) - - torch.cuda.set_device(self.trainer.root_gpu) - model.cuda(self.trainer.root_gpu) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - self.trainer.model = model - - def _step(self, model_step: Callable, args): - args[0] = self.to_device(args[0]) - - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = model_step(*args) - else: - output = model_step(*args) - - return output - - def training_step(self, args): - return self._step(self.trainer.model.training_step, args) - - def validation_step(self, args): - return self._step(self.trainer.model.validation_step, args) - - def test_step(self, args): - return self._step(self.trainer.model.test_step, args) - - def predict(self, args): - return self._step(self.trainer.model.predict, args) - - def to_device(self, batch): - gpu_id = 0 - if isinstance(self.trainer.data_parallel_device_ids, list): - gpu_id = self.trainer.data_parallel_device_ids[0] - - # Don't copy the batch since there is a single gpu that the batch could - # be referenced from and if there are multiple optimizers the batch will - # wind up copying it to the same device repeatedly. - return self.batch_to_device(batch, gpu_id) - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return tensor - - @property - def require_distributed_sampler(self): - return False diff --git a/pytorch_lightning/accelerators/legacy/horovod_accelerator.py b/pytorch_lightning/accelerators/legacy/horovod_accelerator.py deleted file mode 100644 index 8553b0958d59c..0000000000000 --- a/pytorch_lightning/accelerators/legacy/horovod_accelerator.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from contextlib import ExitStack -from typing import Any, Callable, Optional, Union - -import torch -from torch.optim.lr_scheduler import _LRScheduler - -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.utilities import _HOROVOD_AVAILABLE, AMPType, DeviceType -from pytorch_lightning.utilities.distributed import rank_zero_only - -if _HOROVOD_AVAILABLE: - import horovod.torch as hvd - - -class HorovodAccelerator(Accelerator): - amp_backend: AMPType - - def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): - """ - Runs training using horovod - - Example:: - - # default - trainer = Trainer(accelerator=HorovodAccelerator()) - - """ - super().__init__(trainer, cluster_environment) - self.nickname = 'horovod' - - def setup(self, model): - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - if torch.cuda.is_available() and self.trainer._device_type == DeviceType.GPU: - # Horovod: pin GPU to local rank - assert self.trainer.root_gpu == hvd.local_rank() - torch.cuda.set_device(self.trainer.root_gpu) - model.cuda(self.trainer.root_gpu) - - # avoid duplicating progress bar - if hvd.rank() != 0 and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # Horovod: scale the learning rate by the number of workers to account for - # increased total batch size - for optimizer in self.trainer.optimizers: - for param_group in optimizer.param_groups: - param_group['lr'] *= hvd.size() - - # Horovod: adjust base LR used by schedulers to match scaled optimizer initial LR - for scheduler in self.trainer.lr_schedulers: - scheduler = scheduler['scheduler'] - if isinstance(scheduler, _LRScheduler): - scheduler.base_lrs = [lr * hvd.size() for lr in scheduler.base_lrs] - - # Horovod: broadcast parameters & optimizer state to ensure consistent initialization - hvd.broadcast_parameters(model.state_dict(), root_rank=0) - for optimizer in self.trainer.optimizers: - hvd.broadcast_optimizer_state(optimizer, root_rank=0) - - def _filter_named_parameters(model, optimizer): - opt_params = set([p for group in optimizer.param_groups for p in group.get('params', [])]) - return [(name, p) for name, p in model.named_parameters() if p in opt_params] - - # Horovod: wrap optimizers to perform gradient aggregation via allreduce - self.trainer.optimizers = [ - hvd.DistributedOptimizer(optimizer, named_parameters=_filter_named_parameters(model, optimizer)) - for optimizer in self.trainer.optimizers - ] - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # Update logger rank info from Horovod to avoid race conditions from different ranks - # creating directories / writing files in the same locations. - self.trainer.global_rank = hvd.rank() - rank_zero_only.rank = self.trainer.global_rank - - self.trainer.model = model - - def train(self): - with ExitStack() as stack: - for optimizer in self.trainer.optimizers: - # Synchronization will be performed explicitly following backward() - stack.enter_context(optimizer.skip_synchronize()) - - self.trainer.setup_trainer(self.trainer.model) - - # train or test - results = self.train_or_test() - - # Make sure all workers have finished training before returning to the user - hvd.join() - return results - - def _step(self, model_step: Callable, args): - if self.trainer._device_type == DeviceType.GPU: - args[0] = self.batch_to_device(args[0], hvd.local_rank()) - - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = model_step(*args) - else: - output = model_step(*args) - - return output - - def training_step(self, args): - return self._step(self.trainer.model.training_step, args) - - def validation_step(self, args): - return self._step(self.trainer.model.validation_step, args) - - def test_step(self, args): - return self._step(self.trainer.model.test_step, args) - - def predict(self, args): - return self._step(self.trainer.model.predict, args) - - def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs): - super().backward(closure_loss, optimizer, opt_idx, *args, **kwargs) - optimizer.synchronize() - - def on_train_epoch_end(self, outputs): - hvd.join(hvd.local_rank() if self.trainer._device_type == DeviceType.GPU else -1) - - def barrier(self, name: Optional[str] = None): - hvd.join() - - def broadcast(self, obj, src=0): - self.barrier() - obj = hvd.broadcast_object(obj, src) - return obj - - def gather_all_tensors(self, result: Union[torch.Tensor], group: Optional[Any] = None): - if group is not None: - raise ValueError( - "Horovod does not support allgather using a subcommunicator at this time. " - "Unset `group`." - ) - - if len(result.shape) == 0: - # Convert scalars to single dimension tensors - result = result.reshape(1) - - # sync and gather all - hvd.join() - gathered = hvd.allgather(result) - gathered_result = list(gathered.split(1, dim=0)) - return gathered_result - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - if group is not None: - raise ValueError( - "Horovod does not support allreduce using a subcommunicator at this time. " - "Unset `group`." - ) - - if reduce_op is None or reduce_op == "sum": - reduce_op = hvd.Sum - elif isinstance(reduce_op, str) and reduce_op in ("avg", "mean"): - reduce_op = hvd.Average - else: - raise ValueError(f"unrecognized `reduce_op`: {reduce_op}") - - # sync all processes before reduction - hvd.join() - return hvd.allreduce(tensor, op=reduce_op) - - @property - def distributed_sampler_kwargs(self): - return dict(num_replicas=hvd.size(), rank=hvd.rank()) - - @property - def require_distributed_sampler(self): - return True diff --git a/pytorch_lightning/accelerators/legacy/tpu_accelerator.py b/pytorch_lightning/accelerators/legacy/tpu_accelerator.py deleted file mode 100644 index 71a9edecf4c34..0000000000000 --- a/pytorch_lightning/accelerators/legacy/tpu_accelerator.py +++ /dev/null @@ -1,345 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import io -import os -from typing import Any, Callable, Optional, Union - -import torch -import torch.multiprocessing as mp -from torch.optim import Optimizer - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core import LightningModule -from pytorch_lightning.plugins.environments import ClusterEnvironment -from pytorch_lightning.utilities import ( - _TPU_AVAILABLE, - move_data_to_device, - rank_zero_info, - rank_zero_only, - rank_zero_warn, -) -from pytorch_lightning.utilities.exceptions import MisconfigurationException - -if _TPU_AVAILABLE: - import torch_xla - import torch_xla.core.xla_model as xm - import torch_xla.distributed.parallel_loader as xla_pl - import torch_xla.distributed.xla_multiprocessing as xmp - - -class TPUAccelerator(Accelerator): - - def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): - """ - Runs training using TPUs (colab, single machine or pod) - - Example:: - - # default - trainer = Trainer(accelerator=TPUAccelerator()) - - """ - super().__init__(trainer, cluster_environment) - self.start_method = None - self.mp_queue = None - self.nickname = None - - def setup(self, model): - rank_zero_info(f'training on {self.trainer.tpu_cores} TPU cores') - - # TODO: Move this check to Trainer __init__ or device parser - if not _TPU_AVAILABLE: - raise MisconfigurationException('PyTorch XLA not installed.') - - # see: https://discuss.pytorch.org/t/segfault-with-multiprocessing-queue/81292/2 - self.start_method = 'fork' - - # pass in a state q - smp = mp.get_context(self.start_method) - self.mp_queue = smp.SimpleQueue() - - self.trainer.model = model - - def teardown(self): - model = self.trainer.model - - # restore main state with best weights - best_path = self.mp_queue.get() - results = self.mp_queue.get() - last_path = self.mp_queue.get() - - # transfer back the best path to the trainer - if self.trainer.checkpoint_callback is not None: - self.trainer.checkpoint_callback.best_model_path = best_path - # todo, pass also bets score - - # load last weights - if last_path and not self.trainer.testing: - ckpt = torch.load(last_path, map_location=lambda storage, loc: storage) - model.load_state_dict(ckpt) - - self.trainer.model = model - - # when training completes, load the weights back in main process - self.__load_weights_on_main_process() - return results - - def train(self): - model = self.trainer.model - - # train - if self.trainer.tpu_id is not None: - self.tpu_train_in_process(self.trainer.tpu_id, model, self.trainer, self.mp_queue) - else: - xmp.spawn( - self.tpu_train_in_process, - args=(model, self.trainer, self.mp_queue), - nprocs=self.trainer.tpu_cores, - start_method=self.start_method - ) - - def __load_weights_on_main_process(self): - model = self.trainer.model - - # load weights if not interrupted - if self.trainer.on_colab_kaggle and not self.trainer.testing: - self.load_spawn_weights(model) - - self.trainer.model = model - - def tpu_train_in_process(self, tpu_core_idx: int, model: LightningModule, trainer=None, mp_queue=None): - """ - Here we are inside each individual process - """ - # Todo: required argument `tpu_core_idx` is not used - if not trainer: - trainer = self.trainer - - trainer.call_setup_hook(model) - - # setup TPU training - self.__setup_tpu_training(model, trainer) - - self.trainer.setup_trainer(model) - - # train or test - results = self.train_or_test() - - # save weights at the end of training - self.__save_end_of_training_weights(model, trainer) - - # persist info in spawn - self.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) - - def _step(self, model_step: Callable, args): - args[0] = self.to_device(args[0]) - return model_step(*args) - - def training_step(self, args): - return self._step(self.trainer.model.training_step, args) - - def validation_step(self, args): - return self._step(self.trainer.model.validation_step, args) - - def test_step(self, args): - return self._step(self.trainer.model.test_step, args) - - def predict(self, args): - return self._step(self.trainer.model.predict, args) - - def process_dataloader(self, dataloader): - device = xm.xla_device(self.trainer.tpu_id) - dataloader = xla_pl.ParallelLoader(dataloader, [device]) - dataloader = dataloader.per_device_loader(device) - return dataloader - - def to_device(self, batch): - """ - Transfers the data to the TPU. - - Args: - batch: A tensor or collection of tensors. - - Return: - the tensor on the TPU device. - - See Also: - - :func:`~pytorch_lightning.utilities.apply_func.move_data_to_device` - """ - if not _TPU_AVAILABLE: - raise MisconfigurationException( - 'Requested to transfer batch to TPU but XLA is not available.' - ' Are you sure this machine has TPUs?' - ) - device = xm.xla_device(self.trainer.tpu_id) - - return self.batch_to_device(batch, device) - - def __save_end_of_training_weights(self, model: LightningModule, trainer): - # when training ends on these platforms dump weights to get out of the main process - if trainer.on_colab_kaggle: - rank_zero_warn('cleaning up... please do not interrupt') - self.save_spawn_weights(model) - - def __setup_tpu_training(self, model: LightningModule, trainer): - # use the default device from the process - # tpu_device = xm.xla_device() - - # if given an ordinal device, use this as the device - if trainer.tpu_id is not None: - tpu_device = xm.xla_device(trainer.tpu_id) - else: - tpu_device = xm.xla_device() - # track the device and move model to it - trainer._device = tpu_device - model.to(trainer._device) - - # get the appropriate tpu ranks - trainer.tpu_local_core_rank = xm.get_local_ordinal() - trainer.tpu_global_core_rank = xm.get_ordinal() - - # avoid duplicating progress bar - if trainer.tpu_global_core_rank != 0 and trainer.progress_bar_callback is not None: - trainer.progress_bar_callback.disable() - - trainer.global_rank = trainer.tpu_local_core_rank - rank_zero_only.rank = trainer.global_rank - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # init 16 bit for TPU - if trainer.precision == 16: - os.environ['XLA_USE_BF16'] = str(1) - - log.info(f'INIT TPU local core: {trainer.tpu_local_core_rank},' - f' global rank: {trainer.tpu_global_core_rank}' - f' with XLA_USE_BF16={os.environ.get("XLA_USE_BF16")}') - - def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs): - # do backward pass - if self.trainer.train_loop.automatic_optimization: - model = self.trainer.get_model() - model.backward(closure_loss, optimizer, opt_idx) - else: - closure_loss.backward(*args, **kwargs) - - # detach after backward - closure_loss = closure_loss.detach() - - return closure_loss - - def _clip_gradients(self, optimizer: Optimizer, grad_clip_val: Union[float, int], norm_type: float = 2.0): - # this code is a modification of torch.nn.utils.clip_grad_norm_ - # with TPU support based on https://github.com/pytorch/xla/blob/master/TROUBLESHOOTING.md - model = self.trainer.get_model() - parameters = model.parameters() - max_norm = grad_clip_val - - if isinstance(parameters, torch.Tensor): - parameters = [parameters] - parameters = list(filter(lambda p: p.grad is not None, parameters)) - - device = parameters[0].device - out = torch.empty(len(parameters), device=device) - for i, p in enumerate(parameters): - torch.norm(p.grad.data.to(device), norm_type, out=out[i]) - total_norm = torch.norm(out, norm_type) - - clip_coef = torch.tensor(max_norm, device=device) / (total_norm + self.norm_clipping_epsilon) - clip_coef = torch.min(clip_coef, torch.ones_like(clip_coef)) - for p in parameters: - p.grad.data.mul_(clip_coef.to(p.grad.data.device)) - - def barrier(self, name: Optional[str] = None): - torch_xla.core.xla_model.rendezvous(f"pl.Trainer.{name}") - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device, dtype=torch.int32) - stop = xm.mesh_reduce("stop_signal", stop, sum) - torch_xla.core.xla_model.rendezvous("pl.EarlyStoppingCallback.stop_distributed_training_check") - should_stop = int(stop.item()) == self.trainer.world_size - return should_stop - - def save_spawn_weights(self, model): - """ - Dump a temporary checkpoint after ddp ends to get weights out of the process - """ - # Todo: required argument `model` is not used - if self.trainer.is_global_zero: - path = os.path.join(self.trainer.default_root_dir, '__temp_weight_distributed_end.ckpt') - self.trainer.save_checkpoint(path) - return path - - def load_spawn_weights(self, original_model): - """ - Load the temp weights saved in the process - To recover the trained model from the ddp process we load the saved weights - """ - - loaded_model = original_model - - if self.trainer.is_global_zero: - # load weights saved in ddp - path = os.path.join(self.trainer.default_root_dir, '__temp_weight_distributed_end.ckpt') - loaded_model = original_model.__class__.load_from_checkpoint(path) - - # copy loaded weights to old model - original_model.load_state_dict(loaded_model.state_dict()) - - # remove ddp weights - os.remove(path) - - return loaded_model - - def broadcast(self, obj, src=0): - if self.trainer.tpu_id is not None: - # running on a single core - return obj - buffer = io.BytesIO() - torch.save(obj, buffer) - data = bytearray(buffer.getbuffer()) - data_tensor = torch.tensor(data).to(xm.xla_device(), dtype=torch.float) - data = xm.all_gather(data_tensor) - buffer = io.BytesIO(data.cpu().byte().numpy()) - obj = torch.load(buffer) - return obj - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return tensor - - @property - def norm_clipping_epsilon(self): - return 1e-6 - - def on_save(self, checkpoint): - """ - Move XLA tensors to CPU before saving - Recommended on XLA Guide: - https://github.com/pytorch/xla/blob/master/API_GUIDE.md#saving-and-loading-xla-tensors - """ - return move_data_to_device(checkpoint, torch.device("cpu")) - - @property - def distributed_sampler_kwargs(self): - return dict(num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal()) - - @property - def require_distributed_sampler(self): - return True From 56cf66608d04d8fa88f346fc9fcb7e89b0596c2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sat, 13 Feb 2021 00:27:18 +0100 Subject: [PATCH 2/3] update imports --- pytorch_lightning/trainer/data_loading.py | 2 +- pytorch_lightning/trainer/properties.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/data_loading.py b/pytorch_lightning/trainer/data_loading.py index b02f768361ec3..fd93c559ff7d2 100644 --- a/pytorch_lightning/trainer/data_loading.py +++ b/pytorch_lightning/trainer/data_loading.py @@ -21,7 +21,7 @@ from torch.utils.data import BatchSampler, DataLoader, RandomSampler, SequentialSampler from torch.utils.data.distributed import DistributedSampler -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator +from pytorch_lightning.accelerators import Accelerator from pytorch_lightning.core import LightningModule from pytorch_lightning.trainer.supporters import CombinedLoader from pytorch_lightning.utilities import rank_zero_warn diff --git a/pytorch_lightning/trainer/properties.py b/pytorch_lightning/trainer/properties.py index ee6d70f42f247..b7146f58c60d9 100644 --- a/pytorch_lightning/trainer/properties.py +++ b/pytorch_lightning/trainer/properties.py @@ -19,8 +19,8 @@ import torch +from pytorch_lightning.accelerators import Accelerator from pytorch_lightning.accelerators.accelerator_connector import BackendConnector -from pytorch_lightning.accelerators.legacy.accelerator import Accelerator from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, ProgressBarBase from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.trainer.connectors.logger_connector import LoggerConnector From 557f8815fc9395f4a16c6664f8f4b9ab1f3ffa2f Mon Sep 17 00:00:00 2001 From: Jirka Borovec Date: Sat, 13 Feb 2021 00:56:22 +0100 Subject: [PATCH 3/3] formatting --- .yapfignore | 4 ---- tests/core/test_lightning_optimizer.py | 14 ++++++++++++-- tests/plugins/test_sharded_plugin.py | 15 ++++++++++++--- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/.yapfignore b/.yapfignore index e57441bcfb95c..6def4861b4858 100644 --- a/.yapfignore +++ b/.yapfignore @@ -1,9 +1,5 @@ .git/* -# TODO -pytorch_lightning/accelerators/legacy/* - - # TODO pytorch_lightning/plugins/legacy/* diff --git a/tests/core/test_lightning_optimizer.py b/tests/core/test_lightning_optimizer.py index 94a8c8f6a5906..a67d73a3bb16a 100644 --- a/tests/core/test_lightning_optimizer.py +++ b/tests/core/test_lightning_optimizer.py @@ -213,8 +213,18 @@ def test_state(tmpdir): lightning_dict = {} special_attrs = [ - "_accumulate_grad_batches", "_optimizer", "_optimizer_idx", "_support_closure", "_trainer", "__getstate__", - "__setstate__", "state_dict", "load_state_dict", "zero_grad", "__setstate__", "add_param_group", + "_accumulate_grad_batches", + "_optimizer", + "_optimizer_idx", + "_support_closure", + "_trainer", + "__getstate__", + "__setstate__", + "state_dict", + "load_state_dict", + "zero_grad", + "__setstate__", + "add_param_group", "_total_optimizer_step_calls", ] diff --git a/tests/plugins/test_sharded_plugin.py b/tests/plugins/test_sharded_plugin.py index a3c7ca61f2b47..85158adf5d59c 100644 --- a/tests/plugins/test_sharded_plugin.py +++ b/tests/plugins/test_sharded_plugin.py @@ -182,7 +182,10 @@ def test_ddp_sharded_plugin_resume_from_checkpoint(tmpdir): model = BoringModel() trainer = Trainer( - accelerator='ddp_sharded_spawn', num_processes=2, fast_dev_run=True, resume_from_checkpoint=checkpoint_path, + accelerator='ddp_sharded_spawn', + num_processes=2, + fast_dev_run=True, + resume_from_checkpoint=checkpoint_path, ) trainer.fit(model) @@ -212,7 +215,10 @@ def test_ddp_sharded_plugin_resume_from_checkpoint_downsize_gpus(tmpdir): model = BoringModel() trainer = Trainer( - accelerator='ddp_sharded_spawn', fast_dev_run=True, gpus=1, resume_from_checkpoint=checkpoint_path, + accelerator='ddp_sharded_spawn', + fast_dev_run=True, + gpus=1, + resume_from_checkpoint=checkpoint_path, ) trainer.fit(model) @@ -240,7 +246,10 @@ def test_ddp_sharded_plugin_resume_from_checkpoint_gpu_to_cpu(tmpdir): model = BoringModel() trainer = Trainer( - accelerator='ddp_sharded_spawn', num_processes=2, fast_dev_run=True, resume_from_checkpoint=checkpoint_path, + accelerator='ddp_sharded_spawn', + num_processes=2, + fast_dev_run=True, + resume_from_checkpoint=checkpoint_path, ) trainer.fit(model)