From d061960c358314a7f1a0cdaffa9f200c430b5dcf Mon Sep 17 00:00:00 2001 From: Manasa Venkatakrishnan <14958785+manasaV3@users.noreply.github.com> Date: Mon, 21 Oct 2024 08:19:02 -0700 Subject: [PATCH] feat: Script to migrate the directory structure (#311) * Updating tomograms * updates to tomogram processing * updates to tomogram key photo filename * updates to tomogram neuroglance filepath * cleaning up tomogram and related entities * Updating tiltseries with identifiers * Updating alignment for tomogram relations * Adding alignment metadatapath to annotations and tomograms * Updating the paths * Updating the paths * Updating failed tests * Working tests clean up needed * fix: workaround for docker compose bugs. (#295) * fix: support filtering entities by related object id's (#296) * fix: support filtering entities by related object id's * chore: add seed script. (#297) * feat: Updates to the alignment entity (#298) * Updates to schema and template * Fixing tests * Adding files to alignments metadata * Update the alignment configs method type * Fixing typo * Adding undefined alignment_method_type * chore: Documenting the Jensen config generation (#293) * Adds comments to jensen config generation * Adding guardrails * Clean up * fix: method_name * chore: Update footer of release please PR's. (#299) * Adding tests * cleaning up the paths * Cleaning up paths * fix tests * Updating tomogram id generation * Updating viz_config generation * Making id directories * Making id directories * Making annotation id directory * Cleaning up paths with id * Updating neuroglancer precompute for dir structure * Updating neuroglancer config for dir structure * Updating raw tilt import to tiltseries_id directory * Migration script * Update for tomograms * Adding support for alignments * Migrating annotation precompute * Migrating collection_metadata * Migrating rawtilt and gains * Clean up * Clean up * Clean up * Enabling the move * Adding metadata updates * Uncommenting deletion * Adding check before move * Adding wdl * minor fixes * Updating enqueue script * Updating annotation file names * Updating the tomogram key photo path * Fixing tests * Fix key_photo migration * Handling default alignments * fix path standardization when bucket name not in path * Lint. --------- Co-authored-by: Jessica Gadling --- .../infra/schema_migrate-v0.0.1.wdl | 51 +++ ingestion_tools/scripts/common/fs.py | 10 + ingestion_tools/scripts/enqueue_runs.py | 90 +++- ingestion_tools/scripts/importers/tomogram.py | 4 +- .../scripts/importers/visualization_config.py | 1 + .../scripts/migrate_folder_structure.py | 402 ++++++++++++++++++ 6 files changed, 553 insertions(+), 5 deletions(-) create mode 100644 ingestion_tools/infra/schema_migrate-v0.0.1.wdl create mode 100644 ingestion_tools/scripts/migrate_folder_structure.py diff --git a/ingestion_tools/infra/schema_migrate-v0.0.1.wdl b/ingestion_tools/infra/schema_migrate-v0.0.1.wdl new file mode 100644 index 000000000..e871b3e45 --- /dev/null +++ b/ingestion_tools/infra/schema_migrate-v0.0.1.wdl @@ -0,0 +1,51 @@ +version 1.0 + + +task cryoet_folder_migration_workflow { + input { + String docker_image_id + String aws_region + String config_file + String output_bucket + String flags + } + + command <<< + set -euxo pipefail + export PYTHONUNBUFFERED=1 + python --version 1>&2 + ls -l 1>&2 + pwd 1>&2 + cd /usr/src/app/ingestion_tools/scripts + set +x + echo python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2 + python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2 + >>> + + runtime { + docker: docker_image_id + } +} + +workflow cryoet_folder_migration { + input { + String docker_image_id = "cryoet_data_ingestion:latest" + String aws_region = "us-west-2" + String config_file + String output_bucket + String flags + } + + call cryoet_folder_migration_workflow { + input: + docker_image_id = docker_image_id, + aws_region = aws_region, + config_file = config_file, + output_bucket = output_bucket, + flags = flags + } + + output { + File log = "output.txt" + } +} diff --git a/ingestion_tools/scripts/common/fs.py b/ingestion_tools/scripts/common/fs.py index 12f2c08f1..ca796534d 100644 --- a/ingestion_tools/scripts/common/fs.py +++ b/ingestion_tools/scripts/common/fs.py @@ -67,6 +67,10 @@ def exists(self, path: str) -> bool: def read_block(self, path: str, start: int | None = None, end: int | None = None) -> str: pass + @abstractmethod + def move(self, src_path: str, dest_path: str, **kwargs) -> None: + pass + class S3Filesystem(FileSystemApi): def __init__(self, force_overwrite: bool, client_kwargs: None | dict[str, str] = None, **kwargs): @@ -169,6 +173,9 @@ def read_block(self, path: str, start: int | None = None, end: int | None = None return local_dest_file + def move(self, src_path: str, dest_path: str, **kwargs) -> None: + self.s3fs.mv(src_path, dest_path, **kwargs) + class LocalFilesystem(FileSystemApi): def __init__(self, force_overwrite: bool): @@ -203,3 +210,6 @@ def push(self, path: str) -> None: def exists(self, path: str) -> bool: return os.path.exists(path) + + def move(self, src_path: str, dest_path: str, **kwargs) -> None: + shutil.move(src_path, dest_path) diff --git a/ingestion_tools/scripts/enqueue_runs.py b/ingestion_tools/scripts/enqueue_runs.py index 4c3f70d1f..8c2bdec85 100644 --- a/ingestion_tools/scripts/enqueue_runs.py +++ b/ingestion_tools/scripts/enqueue_runs.py @@ -20,9 +20,11 @@ from importers.deposition import DepositionImporter from importers.run import RunImporter from importers.utils import IMPORTERS +from migrate_folder_structure import migrate_common_options from standardize_dirs import common_options as ingest_common_options from common.config import DepositionImportConfig +from common.finders import DestinationFilteredMetadataFinder from common.fs import FileSystemApi logger = logging.getLogger("db_import") @@ -124,10 +126,7 @@ def run_job( } session = Session(region_name=aws_region) - client = session.client( - service_name="stepfunctions", - ) - + client = session.client(service_name="stepfunctions") return client.start_execution( stateMachineArn=state_machine_arn, name=execution_name, @@ -590,5 +589,88 @@ def sync( ) +@cli.command() +@click.argument("config_file", required=True, type=str) +@click.argument("output_bucket", required=True, type=str) +@click.option("--migrate-everything", is_flag=True, default=False) +@click.option( + "--swipe-wdl-key", + type=str, + required=True, + default="schema_migrate-v0.0.1.wdl", + help="Specify wdl key for custom workload", +) +@enqueue_common_options +@migrate_common_options +@click.pass_context +def migrate( + ctx, + config_file: str, + output_bucket: str, + migrate_everything: bool, + swipe_wdl_key: str, + **kwargs, +): + handle_common_options(ctx, kwargs) + fs_mode = "s3" + fs = FileSystemApi.get_fs_api(mode=fs_mode, force_overwrite=False) + + config = DepositionImportConfig(fs, config_file, output_bucket, output_bucket, IMPORTERS) + # config.load_map_files() + filter_runs = [re.compile(pattern) for pattern in kwargs.get("filter_run_name", [])] + exclude_runs = [re.compile(pattern) for pattern in kwargs.get("exclude_run_name", [])] + filter_datasets = [re.compile(pattern) for pattern in kwargs.get("filter_dataset_name", [])] + exclude_datasets = [re.compile(pattern) for pattern in kwargs.get("exclude_dataset_name", [])] + + # Always iterate over depostions, datasets and runs. + for dataset in DatasetImporter.finder(config): + if list(filter(lambda x: x.match(dataset.name), exclude_datasets)): + print(f"Excluding {dataset.name}..") + continue + if filter_datasets and not list(filter(lambda x: x.match(dataset.name), filter_datasets)): + print(f"Skipping {dataset.name}..") + continue + print(f"Processing dataset: {dataset.name}") + + futures = [] + with ProcessPoolExecutor(max_workers=ctx.obj["parallelism"]) as workerpool: + finder = DestinationFilteredMetadataFinder([], RunImporter) + for result_path in finder.find(config, {"dataset_name": dataset.name, "run_name": "*"}): + run_name = os.path.basename(os.path.dirname(result_path)) + if list(filter(lambda x: x.match(run_name), exclude_runs)): + print(f"Excluding {run_name}..") + continue + if filter_runs and not list(filter(lambda x: x.match(run_name), filter_runs)): + print(f"Skipping {run_name}..") + continue + print(f"Processing {run_name}...") + + per_run_args = {} + excluded_args = ["filter_dataset_name", "filter_run_name"] + for k, v in kwargs.items(): + if any(substring in k for substring in excluded_args): + break + per_run_args[k] = v + new_args = to_args(migrate_everything=migrate_everything, **per_run_args) # make a copy + new_args.append(f"--filter-dataset-name '^{dataset.name}$'") + new_args.append(f"--filter-run-name '^{run_name}$'") + + dataset_id = dataset.name + execution_name = f"{int(time.time())}-migrate-ds{dataset_id}-run{run_name}" + + # execution name greater than 80 chars causes boto ValidationException + if len(execution_name) > 80: + execution_name = execution_name[-80:] + + wdl_args = {"config_file": config_file, "output_bucket": output_bucket, "flags": " ".join(new_args)} + + futures.append( + workerpool.submit( + partial(run_job, execution_name, wdl_args, swipe_wdl_key=swipe_wdl_key, **ctx.obj), + ), + ) + wait(futures) + + if __name__ == "__main__": cli() diff --git a/ingestion_tools/scripts/importers/tomogram.py b/ingestion_tools/scripts/importers/tomogram.py index 753812a27..006b3882e 100644 --- a/ingestion_tools/scripts/importers/tomogram.py +++ b/ingestion_tools/scripts/importers/tomogram.py @@ -68,6 +68,7 @@ def __init__( path: str, allow_imports: bool, parents: dict[str, Any], + alignment_metadata_path: str = None, ): super().__init__( config=config, @@ -77,7 +78,8 @@ def __init__( parents=parents, allow_imports=allow_imports, ) - self.alignment_metadata_path = config.to_formatted_path(self.get_alignment_metadata_path()) + + self.alignment_metadata_path = config.to_formatted_path(alignment_metadata_path or self.get_alignment_metadata_path()) self.identifier = TomogramIdentifierHelper.get_identifier( config, self.get_base_metadata(), diff --git a/ingestion_tools/scripts/importers/visualization_config.py b/ingestion_tools/scripts/importers/visualization_config.py index 8866a43d8..83ac16fca 100644 --- a/ingestion_tools/scripts/importers/visualization_config.py +++ b/ingestion_tools/scripts/importers/visualization_config.py @@ -32,6 +32,7 @@ def import_item(self) -> None: if not self.is_import_allowed(): print(f"Skipping import of {self.name}") return + tomogram = self.get_tomogram() if not tomogram.get_base_metadata().get("is_visualization_default"): print("Skipping import for tomogram that is not configured for default_visualization") diff --git a/ingestion_tools/scripts/migrate_folder_structure.py b/ingestion_tools/scripts/migrate_folder_structure.py new file mode 100644 index 000000000..57f1699bc --- /dev/null +++ b/ingestion_tools/scripts/migrate_folder_structure.py @@ -0,0 +1,402 @@ +import json +import os.path +import re +import urllib.parse +from typing import Any, Optional + +import click as click +from importers.alignment import AlignmentImporter +from importers.base_importer import BaseImporter +from importers.utils import IMPORTER_DEP_TREE, IMPORTERS +from standardize_dirs import flatten_dependency_tree + +from common.config import DepositionImportConfig +from common.formats import tojson +from common.fs import FileSystemApi + +OLD_PATHS = { + "alignment": "{dataset_name}/{run_name}/TiltSeries/", + # "alignment_metadata": "{dataset_name}/{run_name}/Alignments/{alignment_id}-alignment_metadata.json", + "annotation": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/Annotations", + "annotation_metadata": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/Annotations/*.json", + "annotation_viz": ( + "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/NeuroglancerPrecompute/{annotation_id}-*" + ), + "collection_metadata": "{dataset_name}/{run_name}/TiltSeries/", + "dataset": "{dataset_name}", + "dataset_keyphoto": "{dataset_name}/Images", + "dataset_metadata": "{dataset_name}/dataset_metadata.json", + "deposition": "depositions_metadata/{deposition_name}", + "deposition_keyphoto": "depositions_metadata/{deposition_name}/Images", + "deposition_metadata": "depositions_metadata/{deposition_name}/deposition_metadata.json", + "frame": "{dataset_name}/{run_name}/Frames", + "gain": "{dataset_name}/{run_name}/Frames/", + "key_image": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/KeyPhotos/", + "rawtilt": "{dataset_name}/{run_name}/TiltSeries", + "run": "{dataset_name}/{run_name}", + "run_metadata": "{dataset_name}/{run_name}/run_metadata.json", + "tiltseries": "{dataset_name}/{run_name}/TiltSeries", + "tiltseries_metadata": "{dataset_name}/{run_name}/TiltSeries/tiltseries_metadata.json", + "tomogram": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/CanonicalTomogram", + "tomogram_metadata": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/CanonicalTomogram/" + "tomogram_metadata.json", + "viz_config": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}/CanonicalTomogram/" + "neuroglancer_config.json", + "voxel_spacing": "{dataset_name}/{run_name}/Tomograms/VoxelSpacing{voxel_spacing_name}", +} + + +def move(config: DepositionImportConfig, old_path: str, new_path: str): + if config.fs.exists(old_path): + print(f"Moving {old_path} to {new_path}") + config.fs.move(old_path, new_path, recursive=True) + else: + print(f"Skipping move of {old_path} as it does not exist") + + +def delete(config: DepositionImportConfig, path: str): + print(f"Deleting {path}") + if hasattr(config.fs, "s3fs"): + config.fs.s3fs.rm(path) + + +def migrate_volume( + cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs, +) -> None: + """ + Moves the volumes of the entity to include the identifier in the path (default of 100) + Moves the metadata of the entity to include the identifier in the path (default of 100) + :param cls: + :param config: + :param parents: + :param kwargs: + :param migrate_metadata: + :return: + """ + output_path = cls.get_output_path() + metadata_path = kwargs.get("metadata_path") + metadata_dir = os.path.dirname(metadata_path) + for fmt, key in {"mrc": "mrc_files", "zarr": "omezarr_dir"}.items(): + filename = cls.metadata.get(key) + if isinstance(filename, list): + filename = filename[0] + old_path = os.path.join(metadata_dir, filename) + new_path = f"{output_path}.{fmt}" + move(config, old_path, new_path) + if kwargs.get("migrate_metadata", True): + move(config, metadata_path, cls.get_metadata_path()) + + +def migrate_tomograms(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + """ + Moves the tomograms in the run to include the identifier in the path (default of 100) + Moves the tomogram_metadata in the run to include the identifier in the path (default of 100) + Update the path to Reconstruction/VoxelSpacingXX.XX/Tomograms/... + + Update the tomogram path in the metadata json + Update the key_photos file path in the metadata json + Update the neuroglancer_config path name in the metadata json + :param cls: + :param config: + :param parents: + :param kwargs: + :return: + """ + migrate_volume(cls, config, parents, {**kwargs, "migrate_metadata": False}) + # This is a very hacky way to get the metadata path set up with the correct values + cls.allow_imports = True + cls.import_metadata() + delete(config, kwargs.get("metadata_path")) + + +def migrate_viz_config(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + """ + Renames the neuroglancer_config in the run to include the identifier (default of 100) + Updates the path of the neuroglancer_config.json to Reconstruction/VoxelSpacingXX.XX/NeuroglancerPrecompute + Update the path to the layer in the neuroglancer_config.json + :param cls: + :param config: + :param parents: + :param kwargs: + :return: + """ + old_path = cls.path + new_path = cls.get_output_path() + with config.fs.open(old_path, "r") as f: + ng_config = json.load(f) + if image_layer := next((layer for layer in ng_config["layers"] if layer["type"] == "image"), None): + path = cls.parents["tomogram"].get_output_path().replace(config.output_prefix, "") + ".zarr" + image_url = urllib.parse.urljoin("https://files.cryoetdataportal.cziscience.com", path) + image_layer["source"]["url"] = "zarr://" + image_url + + old_precompute_path = cls.parents["voxel_spacing"].get_output_path().replace(config.output_prefix, "") + new_precompute_path = old_precompute_path.replace("Tomogram", "Reconstruction") + for layer in ng_config["layers"]: + if layer["type"] in {"annotation", "segmentation"}: + url = layer["source"]["url"] + layer["source"]["url"] = url.replace(old_precompute_path, new_precompute_path) + with config.fs.open(new_path, "w") as f: + f.write(tojson(ng_config)) + + delete(config, old_path) + + +def migrate_key_image(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + """ + Renames the key photo in the run to include the identifier (default of 100) + Update the path to Reconstruction/VoxelSpacingXX.XX/KeyPhotos/.. + :param cls: + :param config: + :param parents: + :param kwargs: + :return: + """ + old_path = cls.path + file_name = f"{os.path.basename(old_path)}" + new_path = os.path.join(cls.get_output_path(), file_name) + move(config, old_path, new_path) + + +def migrate_alignments(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + for old_path in cls.file_paths.values(): + new_path = os.path.join(cls.get_output_path(), os.path.basename(old_path).lstrip(f"{cls.identifier}-")) + move(config, old_path, new_path) + metadata_path = kwargs.get("metadata_path") + if metadata_path: + move(config, metadata_path, cls.get_metadata_path()) + + +def migrate_annotations(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + """ + Moves the annotations in the run to the Reconstruction path + Moves the annotation_metadata in the run to the Reconstruction path + :param cls: + :param config: + :param parents: + :param kwargs: + :return: + """ + output_path = cls.get_output_path() + metadata_path = kwargs.get("metadata_path") + for file in cls.metadata["files"]: + filename = file["path"] + shape = file["shape"].lower() + old_path = os.path.join(config.output_prefix, filename) + new_path = f"{output_path}_{shape}{os.path.splitext(filename)[1]}" + file["path"] = new_path.replace(config.output_prefix, "").lstrip("/") + move(config, old_path, new_path) + cls.metadata["alignment_metadata_path"] = cls.local_metadata["alignment_metadata_path"] + + path = f"{cls.get_output_path()}.json" + print(f"Writing metadata to {path}") + with config.fs.open(path, "w") as f: + f.write(tojson(cls.metadata)) + + delete(config, metadata_path) + + +def migrate_files(cls, config: DepositionImportConfig, parents: dict[str, Any], kwargs) -> None: + dir_name = os.path.basename(cls.path) + old_path = cls.path + new_path = os.path.join(cls.get_output_path(), dir_name) + move(config, old_path, new_path) + + +MIGRATION_MAP = { + "alignment": migrate_alignments, + "annotation": migrate_annotations, + "annotation_viz": migrate_files, + "collection_metadata": migrate_files, + "gain": migrate_files, + "key_image": migrate_key_image, + "rawtilt": migrate_files, + "tiltseries": migrate_volume, + "tomogram": migrate_tomograms, + "viz_config": migrate_viz_config, +} + + +@click.group() +@click.pass_context +def cli(ctx): + pass + + +def migrate_common_options(func): + options = [] + for cls in IMPORTERS: + plural_key = cls.plural_key.replace("_", "-") + importer_key = cls.type_key.replace("_", "-") + options.append(click.option(f"--migrate-{plural_key}", is_flag=True, default=False)) + options.append(click.option(f"--filter-{importer_key}-name", type=str, default=None, multiple=True)) + options.append(click.option(f"--exclude-{importer_key}-name", type=str, default=None, multiple=True)) + for option in options: + func = option(func) + return func + + +def _get_glob_vars(migrate_cls: BaseImporter, parents: dict[str, Any]) -> dict[str, Any]: + glob_vars = {f"{migrate_cls.type_key}_name": "*", f"{migrate_cls.type_key}_id": "*"} + if parents: + for parent in parents.values(): + glob_vars.update(parent.get_glob_vars()) + return glob_vars + + +def finder(migrate_cls, config: DepositionImportConfig, **parents: dict[str, BaseImporter]): + if migrate_cls.type_key in {"deposition", "dataset", "dataset_keyphoto", "deposition_keyphoto"}: + # Uses the default finders as ingestion for these entities + finder_configs = config.get_object_configs(migrate_cls.type_key) + for _finder in finder_configs: + sources = _finder.get("sources", []) + for source in sources: + source_finder_factory = migrate_cls.finder_factory(source, migrate_cls) + for item in source_finder_factory.find(config, {}, **parents): + item.allow_imports = False + yield item, {} + elif f"{migrate_cls.type_key}_metadata" in OLD_PATHS: + # Glob the metadata to define the entity + print(f"Finding metadata for {migrate_cls.type_key}..") + glob_vars = _get_glob_vars(migrate_cls, parents) + glob_str = OLD_PATHS.get(f"{migrate_cls.type_key}_metadata").format(**glob_vars) + glob_path = os.path.join(config.output_prefix, glob_str) + for file_path in config.fs.glob(glob_path): + args = {"metadata_path": file_path} + with config.fs.open(file_path, "r") as f: + metadata = json.load(f) + kwargs = { + "allow_imports": False, + "parents": parents, + "config": config, + "metadata": metadata, + } + if migrate_cls.type_key in {"annotation", "tomogram"}: + glob_vars = {**_get_glob_vars(migrate_cls, parents), **{"alignment_id": "100"}} + alignment_path = AlignmentImporter.dir_path.format(**glob_vars) + kwargs["alignment_metadata_path"] = os.path.join(alignment_path, "alignment_metadata.json") + if migrate_cls.type_key == "annotation": + name = re.search(re.compile(glob_str.replace("*", "(.*)")), file_path).group(1) + additional_args = { + "identifier": int(name.split("-")[0]), + "name": name, + "path": os.path.join(config.output_prefix, metadata["files"][0]["path"]), + } + yield migrate_cls(**{**kwargs, **additional_args}), args + elif migrate_cls.type_key == "run": + name = re.search(re.compile(glob_str.replace("*", "(.*)")), file_path).group(1) + yield migrate_cls(name=name, path=file_path, **kwargs), args + else: + name, path, results = migrate_cls.get_name_and_path(metadata, None, file_path, {}) + kwargs["name"] = name + if path: + kwargs["path"] = path + elif results: + kwargs["file_paths"] = results + yield migrate_cls(**kwargs), args + elif migrate_cls.type_key in OLD_PATHS and migrate_cls.type_key not in {"frame"}: + finder_configs = [] + for configs in config.get_object_configs(migrate_cls.type_key): + for source in configs.get("sources", []): + if "source_multi_glob" in source: + finder_configs.extend(source.get("source_multi_glob").get("list_globs")) + elif "source_glob" in source: + finder_configs.append(source.get("source_glob").get("list_glob")) + elif "literal" in source: + literal_values = source.get("literal").get("value") + if not isinstance(literal_values, list): + literal_values = [literal_values] + finder_configs.append(literal_values) + for source_glob in finder_configs: + dest_suffix = None + if migrate_cls.type_key == "gain" and source_glob.endswith(".dm4"): + dest_suffix = "*.mrc" + if migrate_cls.type_key == "key_image": + dest_suffix = "*.png" + elif isinstance(source_glob, str): + dest_suffix = f"*{os.path.splitext(source_glob)[1]}" + elif migrate_cls.type_key == "alignment" and isinstance(source_glob, list): + print("Skipping alignment migration as source is a literal") + continue + path = OLD_PATHS.get(f"{migrate_cls.type_key}").format(**_get_glob_vars(migrate_cls, parents)) + glob = os.path.join(path, dest_suffix) if dest_suffix else path + source = { + "destination_glob": { + "list_glob": os.path.join(config.output_prefix, glob), + "name_regex": "VoxelSpacing(.*)" if migrate_cls.type_key == "voxel_spacing" else "(.*)", + "match_regex": "(.*)", + }, + } + importer_factory = migrate_cls.finder_factory(source, migrate_cls) + for item in importer_factory.find(config, {}, **parents): + yield item, {} + return [] + + +def _migrate(config, tree, to_migrate, to_iterate, kwargs, parents: Optional[dict[str, Any]] = None): + parents = dict(parents) if parents else {} + for import_class, child_import_classes in tree.items(): + if import_class not in to_iterate: + continue + filter_patterns = [re.compile(pattern) for pattern in kwargs.get(f"filter_{import_class.type_key}_name", [])] + exclude_patterns = [re.compile(pattern) for pattern in kwargs.get(f"exclude_{import_class.type_key}_name", [])] + + parent_args = dict(parents) + + for item, args in finder(import_class, config, **parent_args): + print(f"Iterating {item.type_key}: {item.name}") + if list(filter(lambda x: x.match(item.name), exclude_patterns)): + print(f"Excluding {item.name}..") + continue + if filter_patterns and not list(filter(lambda x: x.match(item.name), filter_patterns)): + print(f"Skipping {item.name}..") + continue + + if child_import_classes: + sub_parents = {import_class.type_key: item} + sub_parents.update(parents) + _migrate(config, child_import_classes, to_migrate, to_iterate, kwargs, sub_parents) + + type_key = item.type_key + if import_class in to_migrate and type_key in MIGRATION_MAP: + if OLD_PATHS.get(type_key, "").rstrip("/") == item.dir_path: + print(f"Skipping {type_key} migration as old path and new path are same") + else: + print(f"Migrating {type_key} {item.name}") + MIGRATION_MAP.get(type_key)(item, config, parents, args) + + +@cli.command() +@click.argument("config_file", required=True, type=str) +@click.argument("output_bucket", required=True, type=str) +@click.option("--migrate-everything", is_flag=True, default=False) +@click.option("--local-fs", type=bool, is_flag=True, default=False) +@migrate_common_options +@click.pass_context +def migrate( + ctx, + config_file: str, + output_bucket: str, + migrate_everything: bool, + local_fs: bool, + **kwargs, +): + fs_mode = "s3" + if local_fs: + fs_mode = "local" + # The increased read timeout prevents failures for the large files. + fs = FileSystemApi.get_fs_api(mode=fs_mode, force_overwrite=False, config_kwargs={"read_timeout": 40}) + config = DepositionImportConfig(fs, config_file, output_bucket, output_bucket, IMPORTERS) + # config.load_map_files() + + iteration_deps = flatten_dependency_tree(IMPORTER_DEP_TREE).items() + if migrate_everything: + to_migrate = set(IMPORTERS) + to_iterate = set(IMPORTERS) + else: + to_migrate = {k for k in IMPORTERS if kwargs.get(f"migrate_{k.plural_key}")} + to_iterate = to_migrate.union({k for k, v in iteration_deps if to_migrate.intersection(v)}) + _migrate(config, IMPORTER_DEP_TREE, to_migrate, to_iterate, kwargs) + + +if __name__ == "__main__": + cli()