diff --git a/environment.yml b/environment.yml index cabfb940c4d6ac..2ffa30d3325530 100644 --- a/environment.yml +++ b/environment.yml @@ -34,6 +34,8 @@ dependencies: - pillow<7.0.0 - scikit-image - nltk>=3.3 + - boto3 + - moto>=1.3.14 # Optional - scikit-learn>=0.20.0 diff --git a/pytorch_lightning/callbacks/model_checkpoint.py b/pytorch_lightning/callbacks/model_checkpoint.py index c0a80d55f308e3..41015ff829db37 100644 --- a/pytorch_lightning/callbacks/model_checkpoint.py +++ b/pytorch_lightning/callbacks/model_checkpoint.py @@ -16,6 +16,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.callbacks.base import Callback from pytorch_lightning.utilities import rank_zero_warn, rank_zero_only +from pytorch_lightning.utilities.cloud_io import gfile, makedirs class ModelCheckpoint(Callback): @@ -100,7 +101,9 @@ def __init__(self, filepath: Optional[str] = None, monitor: str = 'val_loss', ve save_last: bool = False, save_top_k: int = 1, save_weights_only: bool = False, mode: str = 'auto', period: int = 1, prefix: str = ''): super().__init__() - if save_top_k > 0 and filepath is not None and os.path.isdir(filepath) and len(os.listdir(filepath)) > 0: + if(filepath): + filepath = str(filepath) # the tests pass in a py.path.local but we want a str + if save_top_k > 0 and filepath is not None and gfile.isdir(filepath) and len(gfile.listdir(filepath)) > 0: rank_zero_warn( f"Checkpoint directory {filepath} exists and is not empty with save_top_k != 0." "All files in this directory will be deleted when a checkpoint is saved!" @@ -112,12 +115,13 @@ def __init__(self, filepath: Optional[str] = None, monitor: str = 'val_loss', ve if filepath is None: # will be determined by trainer at runtime self.dirpath, self.filename = None, None else: - if os.path.isdir(filepath): + if gfile.isdir(filepath): self.dirpath, self.filename = filepath, '{epoch}' else: filepath = os.path.realpath(filepath) self.dirpath, self.filename = os.path.split(filepath) - os.makedirs(self.dirpath, exist_ok=True) + if not gfile.exists(self.dirpath): + makedirs(self.dirpath) self.save_last = save_last self.save_top_k = save_top_k self.save_weights_only = save_weights_only @@ -159,8 +163,14 @@ def kth_best_model(self): return self.kth_best_model_path def _del_model(self, filepath): - if os.path.isfile(filepath): - os.remove(filepath) + if gfile.exists(filepath): + try: + # in compat mode, remove is not implemented so if running this + # against an actual remove file system and the correct remote + # dependencies exist then this will work fine. + gfile.remove(filepath) + except AttributeError: + os.remove(filepath) def _save_model(self, filepath, trainer, pl_module): @@ -168,7 +178,8 @@ def _save_model(self, filepath, trainer, pl_module): trainer.dev_debugger.track_checkpointing_history(filepath) # make paths - os.makedirs(os.path.dirname(filepath), exist_ok=True) + if not gfile.exists(os.path.dirname(filepath)): + makedirs(os.path.dirname(filepath)) # delegate the saving to the model if self.save_function is not None: @@ -308,7 +319,7 @@ def on_validation_end(self, trainer, pl_module): filepath = self.format_checkpoint_name(epoch, metrics) version_cnt = 0 - while os.path.isfile(filepath): + while gfile.exists(filepath): filepath = self.format_checkpoint_name(epoch, metrics, ver=version_cnt) # this epoch called before version_cnt += 1 diff --git a/pytorch_lightning/core/saving.py b/pytorch_lightning/core/saving.py index 20fa8a8840405c..dae8e2e2ec07fa 100644 --- a/pytorch_lightning/core/saving.py +++ b/pytorch_lightning/core/saving.py @@ -1,5 +1,6 @@ import ast import csv +import io import inspect import os @@ -11,6 +12,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.utilities import rank_zero_warn, AttributeDict from pytorch_lightning.utilities.cloud_io import load as pl_load +from pytorch_lightning.utilities.cloud_io import gfile, cloud_open PRIMITIVE_TYPES = (bool, int, float, str) ALLOWED_CONFIG_TYPES = (AttributeDict, MutableMapping, Namespace) @@ -273,30 +275,30 @@ def load_hparams_from_tags_csv(tags_csv: str) -> Dict[str, Any]: True >>> os.remove(path_csv) """ - if not os.path.isfile(tags_csv): - rank_zero_warn(f'Missing Tags: {tags_csv}.', RuntimeWarning) + if not gfile.exists(tags_csv): + rank_zero_warn(f"Missing Tags: {tags_csv}.", RuntimeWarning) return {} - with open(tags_csv) as fp: - csv_reader = csv.reader(fp, delimiter=',') + with cloud_open(tags_csv, "r") as fp: + csv_reader = csv.reader(fp.read(), delimiter=",") tags = {row[0]: convert(row[1]) for row in list(csv_reader)[1:]} return tags def save_hparams_to_tags_csv(tags_csv: str, hparams: Union[dict, Namespace]) -> None: - if not os.path.isdir(os.path.dirname(tags_csv)): - raise RuntimeError(f'Missing folder: {os.path.dirname(tags_csv)}.') + if not gfile.isdir(os.path.dirname(tags_csv)): + raise RuntimeError(f"Missing folder: {os.path.dirname(tags_csv)}.") if isinstance(hparams, Namespace): hparams = vars(hparams) - with open(tags_csv, 'w', newline='') as fp: - fieldnames = ['key', 'value'] + with cloud_open(tags_csv, "w", newline="") as fp: + fieldnames = ["key", "value"] writer = csv.DictWriter(fp, fieldnames=fieldnames) - writer.writerow({'key': 'key', 'value': 'value'}) + writer.writerow({"key": "key", "value": "value"}) for k, v in hparams.items(): - writer.writerow({'key': k, 'value': v}) + writer.writerow({"key": k, "value": v}) def load_hparams_from_yaml(config_yaml: str) -> Dict[str, Any]: @@ -310,11 +312,11 @@ def load_hparams_from_yaml(config_yaml: str) -> Dict[str, Any]: True >>> os.remove(path_yaml) """ - if not os.path.isfile(config_yaml): - rank_zero_warn(f'Missing Tags: {config_yaml}.', RuntimeWarning) + if not gfile.exists(config_yaml): + rank_zero_warn(f"Missing Tags: {config_yaml}.", RuntimeWarning) return {} - with open(config_yaml) as fp: + with cloud_open(config_yaml, "r") as fp: tags = yaml.load(fp) return tags @@ -326,11 +328,12 @@ def save_hparams_to_yaml(config_yaml, hparams: Union[dict, Namespace]) -> None: config_yaml: path to new YAML file hparams: parameters to be saved """ - if not os.path.isdir(os.path.dirname(config_yaml)): - raise RuntimeError(f'Missing folder: {os.path.dirname(config_yaml)}.') + if not gfile.isdir(os.path.dirname(config_yaml)): + raise RuntimeError(f"Missing folder: {os.path.dirname(config_yaml)}.") if OMEGACONF_AVAILABLE and isinstance(hparams, Container): from omegaconf import OmegaConf + OmegaConf.save(hparams, config_yaml, resolve=True) return @@ -341,7 +344,7 @@ def save_hparams_to_yaml(config_yaml, hparams: Union[dict, Namespace]) -> None: hparams = dict(hparams) assert isinstance(hparams, dict) - with open(config_yaml, 'w', newline='') as fp: + with cloud_open(config_yaml, "w", newline="") as fp: yaml.dump(hparams, fp) diff --git a/pytorch_lightning/loggers/tensorboard.py b/pytorch_lightning/loggers/tensorboard.py index f88b5f97cff8b8..ea97d1e69d773a 100644 --- a/pytorch_lightning/loggers/tensorboard.py +++ b/pytorch_lightning/loggers/tensorboard.py @@ -16,6 +16,7 @@ from pytorch_lightning.core.saving import save_hparams_to_yaml from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities import rank_zero_only +from pytorch_lightning.utilities.cloud_io import gfile, makedirs try: from omegaconf import Container, OmegaConf @@ -109,7 +110,8 @@ def experiment(self) -> SummaryWriter: return self._experiment assert rank_zero_only.rank == 0, 'tried to init log dirs in non global_rank=0' - os.makedirs(self.root_dir, exist_ok=True) + if self.root_dir and not gfile.exists(str(self.root_dir)): + makedirs(self.root_dir) self._experiment = SummaryWriter(log_dir=self.log_dir, **self._kwargs) return self._experiment @@ -162,7 +164,7 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> def save(self) -> None: super().save() dir_path = self.log_dir - if not os.path.isdir(dir_path): + if not gfile.isdir(dir_path): dir_path = self.save_dir # prepare the file path @@ -188,13 +190,13 @@ def version(self) -> int: def _get_next_version(self): root_dir = os.path.join(self.save_dir, self.name) - if not os.path.isdir(root_dir): + if not gfile.isdir(root_dir): log.warning('Missing logger folder: %s', root_dir) return 0 existing_versions = [] - for d in os.listdir(root_dir): - if os.path.isdir(os.path.join(root_dir, d)) and d.startswith("version_"): + for d in gfile.listdir(root_dir): + if gfile.isdir(os.path.join(root_dir, d)) and d.startswith("version_"): existing_versions.append(int(d.split("_")[1])) if len(existing_versions) == 0: diff --git a/pytorch_lightning/trainer/training_io.py b/pytorch_lightning/trainer/training_io.py index 1c223903556e42..87fc4ee8c41d30 100644 --- a/pytorch_lightning/trainer/training_io.py +++ b/pytorch_lightning/trainer/training_io.py @@ -105,6 +105,7 @@ ) from pytorch_lightning.utilities import rank_zero_warn, NATIVE_AMP_AVALAIBLE from pytorch_lightning.utilities.cloud_io import load as pl_load +from pytorch_lightning.utilities.cloud_io import gfile, makedirs try: import torch_xla @@ -409,9 +410,9 @@ def restore_hpc_weights_if_needed(self, model: LightningModule): did_restore = False # look for hpc weights - folderpath = self.weights_save_path - if os.path.exists(folderpath): - files = os.listdir(folderpath) + folderpath = str(self.weights_save_path) + if gfile.exists(folderpath): + files = gfile.listdir(folderpath) hpc_weight_paths = [x for x in files if 'hpc_ckpt' in x] # if hpc weights exist restore model @@ -490,15 +491,17 @@ def restore_training_state(self, checkpoint): # ---------------------------------- def hpc_save(self, folderpath: str, logger): # make sure the checkpoint folder exists - os.makedirs(folderpath, exist_ok=True) + folderpath = str(folderpath) # because the tests pass a path object + if not gfile.exists(folderpath): + makedirs(folderpath) # save logger to make sure we get all the metrics logger.save() ckpt_number = self.max_ckpt_in_folder(folderpath) + 1 - if not os.path.exists(folderpath): - os.makedirs(folderpath, exist_ok=True) + if not gfile.exists(folderpath): + makedirs(folderpath) filepath = os.path.join(folderpath, f'hpc_ckpt_{ckpt_number}.ckpt') # give model a chance to do something on hpc_save @@ -551,7 +554,7 @@ def hpc_load(self, folderpath, on_gpu): log.info(f'restored hpc model from: {filepath}') def max_ckpt_in_folder(self, path, name_key='ckpt_'): - files = os.listdir(path) + files = gfile.listdir(str(path)) files = [x for x in files if name_key in x] if len(files) == 0: return 0 diff --git a/pytorch_lightning/utilities/cloud_io.py b/pytorch_lightning/utilities/cloud_io.py index 2877163e27d135..7329213b20d4fe 100644 --- a/pytorch_lightning/utilities/cloud_io.py +++ b/pytorch_lightning/utilities/cloud_io.py @@ -1,11 +1,56 @@ -import torch - +import sys +import os +from typing import Union from pathlib import Path from urllib.parse import urlparse +import torch + +import tensorboard +from packaging import version +from pytorch_lightning import _logger as log + +# we want this for tf.io.gfile, which if tf is installed gives full tf, +# otherwise gives a pruned down version which works for some file backends but +# not all +from tensorboard.compat import tf + +gfile = tf.io.gfile + +pathlike = Union[Path, str] + +# older version of tensorboard had buggy gfile compatibility layers +# only support remote cloud paths if newer +modern_gfile = version.parse(tensorboard.version.VERSION) >= version.parse('2.0') def load(path_or_url: str, map_location=None): if urlparse(path_or_url).scheme == '' or Path(path_or_url).drive: # no scheme or with a drive letter return torch.load(path_or_url, map_location=map_location) - else: - return torch.hub.load_state_dict_from_url(path_or_url, map_location=map_location) + return torch.hub.load_state_dict_from_url(path_or_url, map_location=map_location) + + +def cloud_open(path: pathlike, mode: str, newline:str = None): + if not modern_gfile or sys.platform == "win32": + log.debug( + "tenosrboard.compat gfile does not work on older versions " + "of tensorboard normal local file open." + ) + return open(path, mode, newline=newline) + if sys.platform == "win32": + log.debug( + "gfile does not handle newlines correctly on windows so remote files are not" + "supported falling back to normal local file open." + ) + return open(path, mode, newline=newline) + try: + return gfile.GFile(path, mode) + except NotImplementedError as e: + # minimal dependencies are installed and only local files will work + return open(path, mode) + + +def makedirs(path: pathlike): + if modern_gfile and hasattr(gfile, "makedirs"): + return gfile.makedirs(str(path)) + # otherwise minimal dependencies are installed and only local files will work + return os.makedirs(path, exist_ok=True) diff --git a/requirements/base.txt b/requirements/base.txt index 4282f6a12d2eb2..c40ef16d85be94 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -7,3 +7,4 @@ future>=0.17.1 # required for builtins in setup.py # pyyaml>=3.13 PyYAML>=5.1 # OmegaConf requirement >=5.1 tqdm>=4.41.0 +packaging diff --git a/requirements/test.txt b/requirements/test.txt index 8491cf8c830402..e306dfbb7be46b 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -12,4 +12,7 @@ black==19.10b0 pre-commit>=1.0 cloudpickle>=1.2 + +boto3 +moto>=1.3.14 nltk>=3.3 diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index 2444d9905f6a11..75bdc297e3d4e9 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -1,23 +1,28 @@ import glob import math import os +import platform import pickle import sys import types +import boto3 +import botocore from argparse import Namespace from pathlib import Path from unittest.mock import patch +import tensorboard import cloudpickle import pytest import torch +from packaging import version from omegaconf import OmegaConf +from moto import mock_s3 import tests.base.develop_utils as tutils from pytorch_lightning import Callback, LightningModule, Trainer from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint -from pytorch_lightning.core.saving import ( - load_hparams_from_tags_csv, load_hparams_from_yaml, save_hparams_to_tags_csv) +from pytorch_lightning.core.saving import load_hparams_from_tags_csv, load_hparams_from_yaml, save_hparams_to_tags_csv from pytorch_lightning.loggers import TensorBoardLogger from pytorch_lightning.trainer.logging import TrainerLoggingMixin from pytorch_lightning.utilities.cloud_io import load as pl_load @@ -25,11 +30,11 @@ from tests.base import EvalModelTemplate -@pytest.mark.parametrize('url_ckpt', [True, False]) +@pytest.mark.parametrize("url_ckpt", [True, False]) def test_no_val_module(monkeypatch, tmpdir, tmpdir_server, url_ckpt): """Tests use case where trainer saves the model, and user loads it from tags independently.""" # set $TORCH_HOME, which determines torch hub's cache path, to tmpdir - monkeypatch.setenv('TORCH_HOME', str(tmpdir)) + monkeypatch.setenv("TORCH_HOME", str(tmpdir)) model = EvalModelTemplate() @@ -37,40 +42,38 @@ def test_no_val_module(monkeypatch, tmpdir, tmpdir_server, url_ckpt): logger = tutils.get_default_logger(tmpdir) trainer = Trainer( - default_root_dir=tmpdir, - max_epochs=1, - logger=logger, - checkpoint_callback=ModelCheckpoint(tmpdir), + default_root_dir=tmpdir, max_epochs=1, logger=logger, checkpoint_callback=ModelCheckpoint(tmpdir), ) # fit model result = trainer.fit(model) # training complete - assert result == 1, 'amp + ddp model failed to complete' + assert result == 1, "amp + ddp model failed to complete" # save model - new_weights_path = os.path.join(tmpdir, 'save_test.ckpt') + new_weights_path = os.path.join(tmpdir, "save_test.ckpt") trainer.save_checkpoint(new_weights_path) # assert ckpt has hparams ckpt = torch.load(new_weights_path) - assert LightningModule.CHECKPOINT_HYPER_PARAMS_KEY in ckpt.keys(), 'module_arguments missing from checkpoints' + assert LightningModule.CHECKPOINT_HYPER_PARAMS_KEY in ckpt.keys(), "module_arguments missing from checkpoints" # load new model hparams_path = tutils.get_data_path(logger, path_dir=tmpdir) - hparams_path = os.path.join(hparams_path, 'hparams.yaml') - ckpt_path = f'http://{tmpdir_server[0]}:{tmpdir_server[1]}/{os.path.basename(new_weights_path)}' if url_ckpt else new_weights_path - model_2 = EvalModelTemplate.load_from_checkpoint( - checkpoint_path=ckpt_path, - hparams_file=hparams_path, + hparams_path = os.path.join(hparams_path, "hparams.yaml") + ckpt_path = ( + f"http://{tmpdir_server[0]}:{tmpdir_server[1]}/{os.path.basename(new_weights_path)}" + if url_ckpt + else new_weights_path ) + model_2 = EvalModelTemplate.load_from_checkpoint(checkpoint_path=ckpt_path, hparams_file=hparams_path,) model_2.eval() -@pytest.mark.parametrize('url_ckpt', [True, False]) +@pytest.mark.parametrize("url_ckpt", [True, False]) def test_no_val_end_module(monkeypatch, tmpdir, tmpdir_server, url_ckpt): """Tests use case where trainer saves the model, and user loads it from tags independently.""" # set $TORCH_HOME, which determines torch hub's cache path, to tmpdir - monkeypatch.setenv('TORCH_HOME', tmpdir) + monkeypatch.setenv("TORCH_HOME", tmpdir) model = EvalModelTemplate() @@ -79,38 +82,32 @@ def test_no_val_end_module(monkeypatch, tmpdir, tmpdir_server, url_ckpt): # fit model trainer = Trainer( - default_root_dir=tmpdir, - max_epochs=1, - logger=logger, - checkpoint_callback=ModelCheckpoint(tmpdir), + default_root_dir=tmpdir, max_epochs=1, logger=logger, checkpoint_callback=ModelCheckpoint(tmpdir), ) result = trainer.fit(model) # traning complete - assert result == 1, 'amp + ddp model failed to complete' + assert result == 1, "amp + ddp model failed to complete" # save model - new_weights_path = os.path.join(tmpdir, 'save_test.ckpt') + new_weights_path = os.path.join(tmpdir, "save_test.ckpt") trainer.save_checkpoint(new_weights_path) # load new model hparams_path = tutils.get_data_path(logger, path_dir=tmpdir) - hparams_path = os.path.join(hparams_path, 'hparams.yaml') - ckpt_path = f'http://{tmpdir_server[0]}:{tmpdir_server[1]}/{os.path.basename(new_weights_path)}' if url_ckpt else new_weights_path - model_2 = EvalModelTemplate.load_from_checkpoint( - checkpoint_path=ckpt_path, - hparams_file=hparams_path, + hparams_path = os.path.join(hparams_path, "hparams.yaml") + ckpt_path = ( + f"http://{tmpdir_server[0]}:{tmpdir_server[1]}/{os.path.basename(new_weights_path)}" + if url_ckpt + else new_weights_path ) + model_2 = EvalModelTemplate.load_from_checkpoint(checkpoint_path=ckpt_path, hparams_file=hparams_path,) model_2.eval() @pytest.mark.parametrize( - ['schedule', 'expected'], - [ - pytest.param({1: 2, 3: 4}, [1, 2, 4]), - pytest.param(3, [3, 3, 3]), - pytest.param(4, [4, 4, 4]) - ] + ["schedule", "expected"], + [pytest.param({1: 2, 3: 4}, [1, 2, 4]), pytest.param(3, [3, 3, 3]), pytest.param(4, [4, 4, 4])], ) def test_gradient_accumulation_scheduling(tmpdir, schedule, expected): """ @@ -128,7 +125,7 @@ def test_gradient_accumulation_scheduling(tmpdir, schedule, expected): with pytest.raises(TypeError): assert Trainer(accumulate_grad_batches=[[2, 3], [4, 6]]) with pytest.raises(TypeError): - assert Trainer(accumulate_grad_batches={1: 2, 3.: 4}) + assert Trainer(accumulate_grad_batches={1: 2, 3.0: 4}) with pytest.raises(TypeError): assert Trainer(accumulate_grad_batches={1: 2.5, 3: 5}) @@ -143,9 +140,16 @@ def test_gradient_accumulation_scheduling(tmpdir, schedule, expected): ) # test optimizer call freq matches scheduler - def _optimizer_step(epoch, batch_idx, optimizer, optimizer_idx, - second_order_closure=None, on_tpu=False, - using_native_amp=False, using_lbfgs=False): + def _optimizer_step( + epoch, + batch_idx, + optimizer, + optimizer_idx, + second_order_closure=None, + on_tpu=False, + using_native_amp=False, + using_lbfgs=False, + ): # only test the first 12 batches in epoch if batch_idx < 12: if epoch == 0: @@ -200,7 +204,7 @@ def test_loading_meta_tags(tmpdir): # save tags logger = tutils.get_default_logger(tmpdir) - logger.log_hyperparams(Namespace(some_str='a_str', an_int=1, a_float=2.0)) + logger.log_hyperparams(Namespace(some_str="a_str", an_int=1, a_float=2.0)) logger.log_hyperparams(hparams) logger.save() @@ -210,7 +214,7 @@ def test_loading_meta_tags(tmpdir): hparams = load_hparams_from_yaml(hparams_path) # save as legacy meta_tags.csv - tags_path = os.path.join(path_expt_dir, 'meta_tags.csv') + tags_path = os.path.join(path_expt_dir, "meta_tags.csv") save_hparams_to_tags_csv(tags_path, hparams) tags = load_hparams_from_tags_csv(tags_path) @@ -225,16 +229,16 @@ def test_loading_yaml(tmpdir): # save tags logger = tutils.get_default_logger(tmpdir) - logger.log_hyperparams(Namespace(some_str='a_str', an_int=1, a_float=2.0)) + logger.log_hyperparams(Namespace(some_str="a_str", an_int=1, a_float=2.0)) logger.log_hyperparams(hparams) logger.save() # load hparams path_expt_dir = tutils.get_data_path(logger, path_dir=tmpdir) - hparams_path = os.path.join(path_expt_dir, 'hparams.yaml') + hparams_path = os.path.join(path_expt_dir, "hparams.yaml") tags = load_hparams_from_yaml(hparams_path) - assert tags['batch_size'] == 32 and tags['hidden_dim'] == 1000 + assert tags["batch_size"] == 32 and tags["hidden_dim"] == 1000 def test_dp_output_reduce(): @@ -248,55 +252,64 @@ def test_dp_output_reduce(): assert mixin.reduce_distributed_output(out, num_gpus=2) == out.mean() # when we have a dict of vals - out = { - 'a': out, - 'b': { - 'c': out - } - } + out = {"a": out, "b": {"c": out}} reduced = mixin.reduce_distributed_output(out, num_gpus=3) - assert reduced['a'] == out['a'] - assert reduced['b']['c'] == out['b']['c'] - - -@pytest.mark.parametrize(["save_top_k", "save_last", "file_prefix", "expected_files"], [ - pytest.param(-1, False, '', {'epoch=4.ckpt', 'epoch=3.ckpt', 'epoch=2.ckpt', 'epoch=1.ckpt', 'epoch=0.ckpt'}, - id="CASE K=-1 (all)"), - pytest.param(1, False, 'test_prefix_', {'test_prefix_epoch=4.ckpt'}, - id="CASE K=1 (2.5, epoch 4)"), - pytest.param(2, False, '', {'epoch=4.ckpt', 'epoch=2.ckpt'}, - id="CASE K=2 (2.5 epoch 4, 2.8 epoch 2)"), - pytest.param(4, False, '', {'epoch=1.ckpt', 'epoch=4.ckpt', 'epoch=3.ckpt', 'epoch=2.ckpt'}, - id="CASE K=4 (save all 4 base)"), - pytest.param(3, False, '', {'epoch=2.ckpt', 'epoch=3.ckpt', 'epoch=4.ckpt'}, - id="CASE K=3 (save the 2nd, 3rd, 4th model)"), - pytest.param(1, True, '', {'epoch=4.ckpt', 'last.ckpt'}, - id="CASE K=1 (save the 4th model and the last model)"), -]) + assert reduced["a"] == out["a"] + assert reduced["b"]["c"] == out["b"]["c"] + + +@pytest.mark.parametrize( + ["save_top_k", "save_last", "file_prefix", "expected_files"], + [ + pytest.param( + -1, + False, + "", + {"epoch=4.ckpt", "epoch=3.ckpt", "epoch=2.ckpt", "epoch=1.ckpt", "epoch=0.ckpt"}, + id="CASE K=-1 (all)", + ), + pytest.param(1, False, "test_prefix_", {"test_prefix_epoch=4.ckpt"}, id="CASE K=1 (2.5, epoch 4)"), + pytest.param(2, False, "", {"epoch=4.ckpt", "epoch=2.ckpt"}, id="CASE K=2 (2.5 epoch 4, 2.8 epoch 2)"), + pytest.param( + 4, + False, + "", + {"epoch=1.ckpt", "epoch=4.ckpt", "epoch=3.ckpt", "epoch=2.ckpt"}, + id="CASE K=4 (save all 4 base)", + ), + pytest.param( + 3, False, "", {"epoch=2.ckpt", "epoch=3.ckpt", "epoch=4.ckpt"}, id="CASE K=3 (save the 2nd, 3rd, 4th model)" + ), + pytest.param(1, True, "", {"epoch=4.ckpt", "last.ckpt"}, id="CASE K=1 (save the 4th model and the last model)"), + ], +) def test_model_checkpoint_options(tmpdir, save_top_k, save_last, file_prefix, expected_files): """Test ModelCheckpoint options.""" def mock_save_function(filepath, *args): - open(filepath, 'a').close() + open(filepath, "a").close() # simulated losses losses = [10, 9, 2.8, 5, 2.5] - checkpoint_callback = ModelCheckpoint(tmpdir, save_top_k=save_top_k, save_last=save_last, - prefix=file_prefix, verbose=1) + checkpoint_callback = ModelCheckpoint( + tmpdir, save_top_k=save_top_k, save_last=save_last, prefix=file_prefix, verbose=1 + ) checkpoint_callback.save_function = mock_save_function trainer = Trainer() # emulate callback's calls during the training for i, loss in enumerate(losses): trainer.current_epoch = i - trainer.callback_metrics = {'val_loss': torch.tensor(loss)} + trainer.callback_metrics = {"val_loss": torch.tensor(loss)} checkpoint_callback.on_validation_end(trainer, trainer.get_model()) file_lists = set(os.listdir(tmpdir)) - assert len(file_lists) == len(expected_files), \ - "Should save %i models when save_top_k=%i" % (len(expected_files), save_top_k) + assert len(file_lists) == len(expected_files), "Should save %i models when save_top_k=%i" % ( + len(expected_files), + save_top_k, + ) # verify correct naming for fname in expected_files: @@ -310,35 +323,33 @@ def test_model_checkpoint_only_weights(tmpdir): model = EvalModelTemplate() trainer = Trainer( - default_root_dir=tmpdir, - max_epochs=1, - checkpoint_callback=ModelCheckpoint(tmpdir, save_weights_only=True), + default_root_dir=tmpdir, max_epochs=1, checkpoint_callback=ModelCheckpoint(tmpdir, save_weights_only=True), ) # fit model result = trainer.fit(model) # training complete - assert result == 1, 'training failed to complete' + assert result == 1, "training failed to complete" checkpoint_path = list(trainer.checkpoint_callback.best_k_models.keys())[0] # assert saved checkpoint has no trainer data checkpoint = torch.load(checkpoint_path) - assert 'optimizer_states' not in checkpoint, 'checkpoint should contain only model weights' - assert 'lr_schedulers' not in checkpoint, 'checkpoint should contain only model weights' + assert "optimizer_states" not in checkpoint, "checkpoint should contain only model weights" + assert "lr_schedulers" not in checkpoint, "checkpoint should contain only model weights" # assert loading model works when checkpoint has only weights assert EvalModelTemplate.load_from_checkpoint(checkpoint_path=checkpoint_path) # directly save model - new_weights_path = os.path.join(tmpdir, 'save_test.ckpt') + new_weights_path = os.path.join(tmpdir, "save_test.ckpt") trainer.save_checkpoint(new_weights_path, weights_only=True) # assert saved checkpoint has no trainer data checkpoint = torch.load(new_weights_path) - assert 'optimizer_states' not in checkpoint, 'checkpoint should contain only model weights' - assert 'lr_schedulers' not in checkpoint, 'checkpoint should contain only model weights' + assert "optimizer_states" not in checkpoint, "checkpoint should contain only model weights" + assert "lr_schedulers" not in checkpoint, "checkpoint should contain only model weights" # assert restoring train state fails - with pytest.raises(KeyError, match='checkpoint contains only the model'): + with pytest.raises(KeyError, match="checkpoint contains only the model"): trainer.restore_training_state(checkpoint) @@ -350,11 +361,11 @@ def test_model_freeze_unfreeze(): model.unfreeze() -@pytest.mark.parametrize('url_ckpt', [True, False]) +@pytest.mark.parametrize("url_ckpt", [True, False]) def test_resume_from_checkpoint_epoch_restored(monkeypatch, tmpdir, tmpdir_server, url_ckpt): """Verify resuming from checkpoint runs the right number of epochs""" # set $TORCH_HOME, which determines torch hub's cache path, to tmpdir - monkeypatch.setenv('TORCH_HOME', tmpdir) + monkeypatch.setenv("TORCH_HOME", tmpdir) hparams = EvalModelTemplate.get_default_hparams() @@ -391,7 +402,7 @@ def increment_on_load_checkpoint(self, _): checkpoint_callback=ModelCheckpoint(tmpdir, save_top_k=-1), default_root_dir=tmpdir, early_stop_callback=False, - val_check_interval=1., + val_check_interval=1.0, ) trainer = Trainer(**trainer_options) @@ -405,21 +416,21 @@ def increment_on_load_checkpoint(self, _): assert model.num_on_load_checkpoint_called == 0 # Other checkpoints can be uncommented if/when resuming mid-epoch is supported - checkpoints = sorted(glob.glob(os.path.join(trainer.checkpoint_callback.dirpath, '*.ckpt'))) + checkpoints = sorted(glob.glob(os.path.join(trainer.checkpoint_callback.dirpath, "*.ckpt"))) if url_ckpt: # transform local paths into url checkpoints ip, port = tmpdir_server - checkpoints = [f'http://{ip}:{port}/' + os.path.basename(check) for check in checkpoints] + checkpoints = [f"http://{ip}:{port}/" + os.path.basename(check) for check in checkpoints] for check in checkpoints: next_model = _new_model() state = pl_load(check) # Resume training - trainer_options['max_epochs'] = 2 + trainer_options["max_epochs"] = 2 new_trainer = Trainer(**trainer_options, resume_from_checkpoint=check) new_trainer.fit(next_model) - assert state['global_step'] + next_model.num_batches_seen == training_batches * trainer_options['max_epochs'] + assert state["global_step"] + next_model.num_batches_seen == training_batches * trainer_options["max_epochs"] assert next_model.num_on_load_checkpoint_called == 1 @@ -432,9 +443,7 @@ def _init_steps_model(): # get number of samples in 1 epoch num_train_samples = math.floor(len(model.train_dataloader()) * train_percent) - trainer_options = dict( - limit_train_batches=train_percent, - ) + trainer_options = dict(limit_train_batches=train_percent,) return model, trainer_options, num_train_samples @@ -444,9 +453,7 @@ def test_trainer_max_steps_and_epochs(tmpdir): # define less train steps than epochs trainer_options.update( - default_root_dir=tmpdir, - max_epochs=3, - max_steps=num_train_samples + 10, + default_root_dir=tmpdir, max_epochs=3, max_steps=num_train_samples + 10, ) # fit model @@ -459,8 +466,7 @@ def test_trainer_max_steps_and_epochs(tmpdir): # define less train epochs than steps trainer_options.update( - max_epochs=2, - max_steps=trainer_options['max_epochs'] * 2 * num_train_samples, + max_epochs=2, max_steps=trainer_options["max_epochs"] * 2 * num_train_samples, ) # fit model @@ -480,14 +486,14 @@ def test_trainer_min_steps_and_epochs(tmpdir): # define callback for stopping the model and default epochs trainer_options.update( default_root_dir=tmpdir, - early_stop_callback=EarlyStopping(monitor='val_loss', min_delta=1.0), + early_stop_callback=EarlyStopping(monitor="val_loss", min_delta=1.0), val_check_interval=2, min_epochs=1, max_epochs=7, ) # define less min steps than 1 epoch - trainer_options['min_steps'] = math.floor(num_train_samples / 2) + trainer_options["min_steps"] = math.floor(num_train_samples / 2) # fit model trainer = Trainer(**trainer_options) @@ -495,11 +501,12 @@ def test_trainer_min_steps_and_epochs(tmpdir): assert result == 1, "Training did not complete" # check model ran for at least min_epochs - assert trainer.global_step >= num_train_samples and \ - trainer.current_epoch > 0, "Model did not train for at least min_epochs" + assert ( + trainer.global_step >= num_train_samples and trainer.current_epoch > 0 + ), "Model did not train for at least min_epochs" # define less epochs than min_steps - trainer_options['min_steps'] = math.floor(num_train_samples * 1.5) + trainer_options["min_steps"] = math.floor(num_train_samples * 1.5) # fit model trainer = Trainer(**trainer_options) @@ -507,8 +514,9 @@ def test_trainer_min_steps_and_epochs(tmpdir): assert result == 1, "Training did not complete" # check model ran for at least num_train_samples*1.5 - assert trainer.global_step >= math.floor(num_train_samples * 1.5) and \ - trainer.current_epoch > 0, "Model did not train for at least min_steps" + assert ( + trainer.global_step >= math.floor(num_train_samples * 1.5) and trainer.current_epoch > 0 + ), "Model did not train for at least min_steps" def test_benchmark_option(tmpdir): @@ -521,11 +529,7 @@ def test_benchmark_option(tmpdir): assert not torch.backends.cudnn.benchmark # fit model - trainer = Trainer( - default_root_dir=tmpdir, - max_epochs=1, - benchmark=True, - ) + trainer = Trainer(default_root_dir=tmpdir, max_epochs=1, benchmark=True,) result = trainer.fit(model) # verify training completed @@ -535,8 +539,8 @@ def test_benchmark_option(tmpdir): assert torch.backends.cudnn.benchmark -@pytest.mark.parametrize('ckpt_path', [None, 'best', 'specific']) -@pytest.mark.parametrize('save_top_k', [-1, 0, 1, 2]) +@pytest.mark.parametrize("ckpt_path", [None, "best", "specific"]) +@pytest.mark.parametrize("save_top_k", [-1, 0, 1, 2]) def test_test_checkpoint_path(tmpdir, ckpt_path, save_top_k): hparams = EvalModelTemplate.get_default_hparams() @@ -548,10 +552,10 @@ def test_test_checkpoint_path(tmpdir, ckpt_path, save_top_k): checkpoint_callback=ModelCheckpoint(save_top_k=save_top_k), ) trainer.fit(model) - if ckpt_path == 'best': + if ckpt_path == "best": # ckpt_path is 'best', meaning we load the best weights if save_top_k <= 0: - with pytest.raises(MisconfigurationException, match='.*is not configured to save the best.*'): + with pytest.raises(MisconfigurationException, match=".*is not configured to save the best.*"): trainer.test(ckpt_path=ckpt_path) else: trainer.test(ckpt_path=ckpt_path) @@ -565,9 +569,13 @@ def test_test_checkpoint_path(tmpdir, ckpt_path, save_top_k): # specific checkpoint, pick one from saved ones if save_top_k == 0: with pytest.raises(FileNotFoundError): - trainer.test(ckpt_path='random.ckpt') + trainer.test(ckpt_path="random.ckpt") else: - ckpt_path = str(list((Path(tmpdir) / f'lightning_logs/version_{trainer.logger.version}/checkpoints').iterdir())[0].absolute()) + ckpt_path = str( + list((Path(tmpdir) / f"lightning_logs/version_{trainer.logger.version}/checkpoints").iterdir())[ + 0 + ].absolute() + ) trainer.test(ckpt_path=ckpt_path) assert trainer.tested_ckpt_path == ckpt_path @@ -604,12 +612,10 @@ def validation_epoch_end(self, *args, **kwargs): result = trainer.fit(model) # check that limit_val_batches=0 turns off validation - assert result == 1, 'training failed to complete' + assert result == 1, "training failed to complete" assert trainer.current_epoch == 1 - assert not model.validation_step_invoked, \ - '`validation_step` should not run when `limit_val_batches=0`' - assert not model.validation_epoch_end_invoked, \ - '`validation_epoch_end` should not run when `limit_val_batches=0`' + assert not model.validation_step_invoked, "`validation_step` should not run when `limit_val_batches=0`" + assert not model.validation_epoch_end_invoked, "`validation_epoch_end` should not run when `limit_val_batches=0`" # check that limit_val_batches has no influence when fast_dev_run is turned on model = CurrentModel(**hparams) @@ -617,16 +623,13 @@ def validation_epoch_end(self, *args, **kwargs): trainer = Trainer(**trainer_options) result = trainer.fit(model) - assert result == 1, 'training failed to complete' + assert result == 1, "training failed to complete" assert trainer.current_epoch == 0 - assert model.validation_step_invoked, \ - 'did not run `validation_step` with `fast_dev_run=True`' - assert model.validation_epoch_end_invoked, \ - 'did not run `validation_epoch_end` with `fast_dev_run=True`' + assert model.validation_step_invoked, "did not run `validation_step` with `fast_dev_run=True`" + assert model.validation_epoch_end_invoked, "did not run `validation_epoch_end` with `fast_dev_run=True`" def test_nan_loss_detection(tmpdir): - class CurrentModel(EvalModelTemplate): test_batch_inf_loss = 8 @@ -634,7 +637,7 @@ def training_step(self, batch, batch_idx, optimizer_idx=None): output = super().training_step(batch, batch_idx, optimizer_idx) if batch_idx == self.test_batch_inf_loss: if isinstance(output, dict): - output['loss'] *= torch.tensor(math.inf) # make loss infinite + output["loss"] *= torch.tensor(math.inf) # make loss infinite else: output /= 0 return output @@ -642,13 +645,9 @@ def training_step(self, batch, batch_idx, optimizer_idx=None): model = CurrentModel() # fit model - trainer = Trainer( - default_root_dir=tmpdir, - max_steps=(model.test_batch_inf_loss + 1), - terminate_on_nan=True, - ) + trainer = Trainer(default_root_dir=tmpdir, max_steps=(model.test_batch_inf_loss + 1), terminate_on_nan=True,) - with pytest.raises(ValueError, match=r'.*The loss returned in `training_step` is nan or inf.*'): + with pytest.raises(ValueError, match=r".*The loss returned in `training_step` is nan or inf.*"): trainer.fit(model) assert trainer.global_step == model.test_step_inf_loss @@ -657,7 +656,6 @@ def training_step(self, batch, batch_idx, optimizer_idx=None): def test_nan_params_detection(tmpdir): - class CurrentModel(EvalModelTemplate): test_batch_nan = 8 @@ -667,13 +665,9 @@ def on_after_backward(self): torch.nn.init.constant_(self.c_d1.bias, math.nan) model = CurrentModel() - trainer = Trainer( - default_root_dir=tmpdir, - max_steps=(model.test_batch_nan + 1), - terminate_on_nan=True, - ) + trainer = Trainer(default_root_dir=tmpdir, max_steps=(model.test_batch_nan + 1), terminate_on_nan=True,) - with pytest.raises(ValueError, match=r'.*Detected nan and/or inf values in `c_d1.bias`.*'): + with pytest.raises(ValueError, match=r".*Detected nan and/or inf values in `c_d1.bias`.*"): trainer.fit(model) assert trainer.global_step == model.test_batch_nan @@ -734,12 +728,7 @@ def _optimizer_step(*args, **kwargs): grad_norm = torch.norm(torch.stack([torch.norm(p.grad.detach(), 2) for p in parameters]), 2) assert (grad_norm - 1.0).abs() < 0.01, "Gradient norm != 1.0: {grad_norm}".format(grad_norm=grad_norm) - trainer = Trainer( - max_steps=1, - max_epochs=1, - gradient_clip_val=1.0, - default_root_dir=tmpdir, - ) + trainer = Trainer(max_steps=1, max_epochs=1, gradient_clip_val=1.0, default_root_dir=tmpdir,) # for the test model.optimizer_step = _optimizer_step @@ -749,9 +738,7 @@ def _optimizer_step(*args, **kwargs): def test_gpu_choice(tmpdir): - trainer_options = dict( - default_root_dir=tmpdir, - ) + trainer_options = dict(default_root_dir=tmpdir,) # Only run if CUDA is available if not torch.cuda.is_available(): return @@ -759,38 +746,39 @@ def test_gpu_choice(tmpdir): num_gpus = torch.cuda.device_count() Trainer(**trainer_options, gpus=num_gpus, auto_select_gpus=True) - with pytest.raises(RuntimeError, match=r'.*No GPUs available.*'): + with pytest.raises(RuntimeError, match=r".*No GPUs available.*"): Trainer(**trainer_options, gpus=num_gpus + 1, auto_select_gpus=True) -@pytest.mark.parametrize(['tpu_cores', 'expected_tpu_id', 'error_expected'], [ - pytest.param(1, None, False), - pytest.param(8, None, False), - pytest.param([1], 1, False), - pytest.param([8], 8, False), - pytest.param('1,', 1, False), - pytest.param('1', None, False), - pytest.param('9, ', 9, True), - pytest.param([9], 9, True), - pytest.param([0], 0, True), - pytest.param(2, None, True), - pytest.param(10, None, True), -]) +@pytest.mark.parametrize( + ["tpu_cores", "expected_tpu_id", "error_expected"], + [ + pytest.param(1, None, False), + pytest.param(8, None, False), + pytest.param([1], 1, False), + pytest.param([8], 8, False), + pytest.param("1,", 1, False), + pytest.param("1", None, False), + pytest.param("9, ", 9, True), + pytest.param([9], 9, True), + pytest.param([0], 0, True), + pytest.param(2, None, True), + pytest.param(10, None, True), + ], +) def test_tpu_choice(tmpdir, tpu_cores, expected_tpu_id, error_expected): if error_expected: - with pytest.raises(MisconfigurationException, match=r'.*tpu_cores` can only be 1, 8 or [<1-8>]*'): + with pytest.raises(MisconfigurationException, match=r".*tpu_cores` can only be 1, 8 or [<1-8>]*"): Trainer(default_root_dir=tmpdir, tpu_cores=tpu_cores, auto_select_gpus=True) else: trainer = Trainer(default_root_dir=tmpdir, tpu_cores=tpu_cores, auto_select_gpus=True) assert trainer.tpu_id == expected_tpu_id -@pytest.mark.parametrize(['limit_val_batches'], [ - pytest.param(0.0), # this should run no sanity checks - pytest.param(1), - pytest.param(1.0), - pytest.param(0.3), -]) +@pytest.mark.parametrize( + ["limit_val_batches"], + [pytest.param(0.0), pytest.param(1), pytest.param(1.0), pytest.param(0.3),], # this should run no sanity checks +) def test_num_sanity_val_steps(tmpdir, limit_val_batches): """ Test that num_sanity_val_steps=-1 runs through all validation data once. @@ -805,93 +793,232 @@ def test_num_sanity_val_steps(tmpdir, limit_val_batches): limit_val_batches=limit_val_batches, # should have no influence max_steps=1, ) - assert trainer.num_sanity_val_steps == float('inf') + assert trainer.num_sanity_val_steps == float("inf") val_dataloaders = model.val_dataloader__multiple() - with patch.object(trainer, 'evaluation_forward', wraps=trainer.evaluation_forward) as mocked: + with patch.object(trainer, "evaluation_forward", wraps=trainer.evaluation_forward) as mocked: trainer.fit(model, val_dataloaders=val_dataloaders) assert mocked.call_count == sum(len(dl) * (limit_val_batches > 0) for dl in val_dataloaders) -@pytest.mark.parametrize("trainer_kwargs,expected", [ - pytest.param( - dict(distributed_backend=None, gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend="dp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend="dp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend="ddp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend="ddp", num_processes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2) - ), - pytest.param( - dict(distributed_backend="ddp", num_nodes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend="ddp_cpu", num_processes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2) - ), - pytest.param( - dict(distributed_backend="ddp2", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) - ), - pytest.param( - dict(distributed_backend=None, gpus=1), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] - ), - pytest.param( - dict(distributed_backend="dp", gpus=1), - dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] - ), - pytest.param( - dict(distributed_backend="ddp", gpus=1), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] - ), - pytest.param( - dict(distributed_backend="ddp_cpu", num_processes=2, gpus=1), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2), - marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] - ), - pytest.param( - dict(distributed_backend="ddp2", gpus=1), - dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=1, on_gpu=True, use_single_gpu=False, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] - ), - pytest.param( - dict(distributed_backend=None, gpus=2), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=2), - marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] - ), - pytest.param( - dict(distributed_backend="dp", gpus=2), - dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] - ), - pytest.param( - dict(distributed_backend="ddp", gpus=2), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=2), - marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] - ), - pytest.param( - dict(distributed_backend="ddp2", gpus=2), - dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=1), - marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] - ), -]) +@pytest.mark.parametrize( + "trainer_kwargs,expected", + [ + pytest.param( + dict(distributed_backend=None, gpus=None), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend="dp", gpus=None), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend="dp", gpus=None), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend="ddp", gpus=None), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend="ddp", num_processes=2, gpus=None), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=2, + ), + ), + pytest.param( + dict(distributed_backend="ddp", num_nodes=2, gpus=None), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend="ddp_cpu", num_processes=2, gpus=None), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=2, + ), + ), + pytest.param( + dict(distributed_backend="ddp2", gpus=None), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=1, + ), + ), + pytest.param( + dict(distributed_backend=None, gpus=1), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=False, + num_gpus=1, + on_gpu=True, + use_single_gpu=True, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")], + ), + pytest.param( + dict(distributed_backend="dp", gpus=1), + dict( + use_dp=True, + use_ddp=False, + use_ddp2=False, + num_gpus=1, + on_gpu=True, + use_single_gpu=True, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")], + ), + pytest.param( + dict(distributed_backend="ddp", gpus=1), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=1, + on_gpu=True, + use_single_gpu=True, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")], + ), + pytest.param( + dict(distributed_backend="ddp_cpu", num_processes=2, gpus=1), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=0, + on_gpu=False, + use_single_gpu=False, + num_processes=2, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")], + ), + pytest.param( + dict(distributed_backend="ddp2", gpus=1), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=True, + num_gpus=1, + on_gpu=True, + use_single_gpu=False, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")], + ), + pytest.param( + dict(distributed_backend=None, gpus=2), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=2, + on_gpu=True, + use_single_gpu=False, + num_processes=2, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")], + ), + pytest.param( + dict(distributed_backend="dp", gpus=2), + dict( + use_dp=True, + use_ddp=False, + use_ddp2=False, + num_gpus=2, + on_gpu=True, + use_single_gpu=False, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")], + ), + pytest.param( + dict(distributed_backend="ddp", gpus=2), + dict( + use_dp=False, + use_ddp=True, + use_ddp2=False, + num_gpus=2, + on_gpu=True, + use_single_gpu=False, + num_processes=2, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")], + ), + pytest.param( + dict(distributed_backend="ddp2", gpus=2), + dict( + use_dp=False, + use_ddp=False, + use_ddp2=True, + num_gpus=2, + on_gpu=True, + use_single_gpu=False, + num_processes=1, + ), + marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")], + ), + ], +) def test_trainer_config(trainer_kwargs, expected): trainer = Trainer(**trainer_kwargs) assert trainer.use_dp is expected["use_dp"] @@ -908,83 +1035,117 @@ def test_trainer_subclassing(): # First way of pulling out args from signature is to list them class TrainerSubclass(Trainer): - - def __init__(self, custom_arg, *args, custom_kwarg='test', **kwargs): + def __init__(self, custom_arg, *args, custom_kwarg="test", **kwargs): super().__init__(*args, **kwargs) self.custom_arg = custom_arg self.custom_kwarg = custom_kwarg - trainer = TrainerSubclass(123, custom_kwarg='custom', fast_dev_run=True) + trainer = TrainerSubclass(123, custom_kwarg="custom", fast_dev_run=True) result = trainer.fit(model) assert result == 1 assert trainer.custom_arg == 123 - assert trainer.custom_kwarg == 'custom' + assert trainer.custom_kwarg == "custom" assert trainer.fast_dev_run # Second way is to pop from the dict # It's a special case because Trainer does not have any positional args class TrainerSubclass(Trainer): - def __init__(self, **kwargs): - self.custom_arg = kwargs.pop('custom_arg', 0) - self.custom_kwarg = kwargs.pop('custom_kwarg', 'test') + self.custom_arg = kwargs.pop("custom_arg", 0) + self.custom_kwarg = kwargs.pop("custom_kwarg", "test") super().__init__(**kwargs) - trainer = TrainerSubclass(custom_kwarg='custom', fast_dev_run=True) + trainer = TrainerSubclass(custom_kwarg="custom", fast_dev_run=True) result = trainer.fit(model) assert result == 1 - assert trainer.custom_kwarg == 'custom' + assert trainer.custom_kwarg == "custom" assert trainer.fast_dev_run # when we pass in an unknown arg, the base class should complain with pytest.raises(TypeError, match=r"__init__\(\) got an unexpected keyword argument 'abcdefg'"): - TrainerSubclass(abcdefg='unknown_arg') + TrainerSubclass(abcdefg="unknown_arg") -@pytest.mark.parametrize('trainer_params', [ - OmegaConf.create({'max_epochs': 1, 'gpus': 1}), - OmegaConf.create({'max_epochs': 1, 'gpus': [0]}), -]) +@pytest.mark.parametrize( + "trainer_params", + [OmegaConf.create({"max_epochs": 1, "gpus": 1}), OmegaConf.create({"max_epochs": 1, "gpus": [0]}),], +) @pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires GPU machine") def test_trainer_omegaconf(trainer_params): Trainer(**trainer_params) def test_trainer_pickle(tmpdir): - trainer = Trainer( - max_epochs=1, - default_root_dir=tmpdir, - ) + trainer = Trainer(max_epochs=1, default_root_dir=tmpdir,) pickle.dumps(trainer) cloudpickle.dumps(trainer) +@pytest.fixture(scope="function") +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" # nosec + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" # nosec + os.environ["AWS_SECURITY_TOKEN"] = "testing" # nosec + os.environ["AWS_SESSION_TOKEN"] = "testing" # nosec + + +@pytest.mark.skipif(platform.system() == "Windows", reason="Saving to remote paths is not supported on Windows") +@pytest.mark.skipif( + version.parse(tensorboard.version.VERSION) < version.parse("2.0"), reason="remote paths require tensorboard>=2.0" +) +def test_trainer_s3_path(monkeypatch,aws_credentials): + """Test that we can save to remote directories.""" + + # put everything on a remote s3 path + monkeypatch.setenv("TORCH_HOME", "s3://test_bucket/") + + # set $TORCH_HOME, which determines torch hub's cache path, to tmpdir + model = EvalModelTemplate() + with mock_s3(): + conn = boto3.resource("s3", region_name="us-east-1") + conn.create_bucket(Bucket="test_bucket") + tb_logger = TensorBoardLogger('s3://test_bucket/logs/') # use an s3 path + trainer = Trainer(max_epochs=1, default_root_dir="s3://test_bucket/outputpath/", logger=tb_logger) + result = trainer.fit(model) + + # Explicitly fail if not using mock s3 to indicate it tries to write to + # a remote location and fails if s3 is not setup + with pytest.raises(botocore.exceptions.ClientError): + tb_logger = TensorBoardLogger('s3://test_bucket/logs/') + trainer = Trainer(max_epochs=1, default_root_dir="s3://test_bucket/outputpath_fail/", logger=tb_logger) + result = trainer.fit(model) + + def test_trainer_setup_call(tmpdir): """Test setup call with fit and test call.""" class CurrentModel(EvalModelTemplate): - def setup(self, stage): self.stage = stage class TrainerSubclass(Trainer): - def setup(self, stage): self.stage = stage model = CurrentModel() # fit model - trainer = TrainerSubclass( - default_root_dir=tmpdir, - max_epochs=1, - checkpoint_callback=False - ) + trainer = TrainerSubclass(default_root_dir=tmpdir, max_epochs=1, checkpoint_callback=False) trainer.fit(model) - assert trainer.stage == 'fit' - assert trainer.get_model().stage == 'fit' + assert trainer.stage == "fit" + assert trainer.get_model().stage == "fit" trainer.test(ckpt_path=None) - assert trainer.stage == 'test' - assert trainer.get_model().stage == 'test' + assert trainer.stage == "test" + assert trainer.get_model().stage == "test" + + +def test_trainer_ddp_spawn_none_checkpoint(tmpdir): + model = EvalModelTemplate() + trainer = Trainer(default_root_dir=tmpdir, max_epochs=1, checkpoint_callback=None, distributed_backend="ddp_spawn") + assert trainer.checkpoint_callback is None + result = trainer.fit(model) + assert trainer.checkpoint_callback is None + assert result == 1