Skip to content

Commit

Permalink
feat: Script to migrate the directory structure (#311)
Browse files Browse the repository at this point in the history
* Updating tomograms

* updates to tomogram processing

* updates to tomogram key photo filename

* updates to tomogram neuroglance filepath

* cleaning up tomogram and related entities

* Updating tiltseries with identifiers

* Updating alignment for tomogram relations

* Adding alignment metadatapath to annotations and tomograms

* Updating the paths

* Updating the paths

* Updating failed tests

* Working tests clean up needed

* fix: workaround for docker compose bugs. (#295)

* fix: support filtering entities by related object id's (#296)

* fix: support filtering entities by related object id's

* chore: add seed script. (#297)

* feat: Updates to the alignment entity (#298)

* Updates to schema and template

* Fixing tests

* Adding files to alignments metadata

* Update the alignment configs method type

* Fixing typo

* Adding undefined alignment_method_type

* chore: Documenting the Jensen config generation (#293)

* Adds comments to jensen config generation

* Adding guardrails

* Clean up

* fix: method_name

* chore: Update footer of release please PR's. (#299)

* Adding tests

* cleaning up the paths

* Cleaning up paths

* fix tests

* Updating tomogram id generation

* Updating viz_config generation

* Making id directories

* Making id directories

* Making annotation id directory

* Cleaning up paths with id

* Updating neuroglancer precompute for dir structure

* Updating neuroglancer config for dir structure

* Updating raw tilt import to tiltseries_id directory

* Migration script

* Update for tomograms

* Adding support for alignments

* Migrating annotation precompute

* Migrating collection_metadata

* Migrating rawtilt and gains

* Clean up

* Clean up

* Clean up

* Enabling the move

* Adding metadata updates

* Uncommenting deletion

* Adding check before move

* Adding wdl

* minor fixes

* Updating enqueue script

* Updating annotation file names

* Updating the tomogram key photo path

* Fixing tests

* Fix key_photo migration

* Handling default alignments

* fix path standardization when bucket name not in path

* Lint.

---------

Co-authored-by: Jessica Gadling <jgadling@chanzuckerberg.com>
  • Loading branch information
manasaV3 and jgadling authored Oct 21, 2024
1 parent a589d3b commit d061960
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 5 deletions.
51 changes: 51 additions & 0 deletions ingestion_tools/infra/schema_migrate-v0.0.1.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
version 1.0


task cryoet_folder_migration_workflow {
input {
String docker_image_id
String aws_region
String config_file
String output_bucket
String flags
}

command <<<
set -euxo pipefail
export PYTHONUNBUFFERED=1
python --version 1>&2
ls -l 1>&2
pwd 1>&2
cd /usr/src/app/ingestion_tools/scripts
set +x
echo python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2
python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2
>>>

runtime {
docker: docker_image_id
}
}

workflow cryoet_folder_migration {
input {
String docker_image_id = "cryoet_data_ingestion:latest"
String aws_region = "us-west-2"
String config_file
String output_bucket
String flags
}

call cryoet_folder_migration_workflow {
input:
docker_image_id = docker_image_id,
aws_region = aws_region,
config_file = config_file,
output_bucket = output_bucket,
flags = flags
}

output {
File log = "output.txt"
}
}
10 changes: 10 additions & 0 deletions ingestion_tools/scripts/common/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def exists(self, path: str) -> bool:
def read_block(self, path: str, start: int | None = None, end: int | None = None) -> str:
pass

@abstractmethod
def move(self, src_path: str, dest_path: str, **kwargs) -> None:
pass


class S3Filesystem(FileSystemApi):
def __init__(self, force_overwrite: bool, client_kwargs: None | dict[str, str] = None, **kwargs):
Expand Down Expand Up @@ -169,6 +173,9 @@ def read_block(self, path: str, start: int | None = None, end: int | None = None

return local_dest_file

def move(self, src_path: str, dest_path: str, **kwargs) -> None:
self.s3fs.mv(src_path, dest_path, **kwargs)


class LocalFilesystem(FileSystemApi):
def __init__(self, force_overwrite: bool):
Expand Down Expand Up @@ -203,3 +210,6 @@ def push(self, path: str) -> None:

def exists(self, path: str) -> bool:
return os.path.exists(path)

def move(self, src_path: str, dest_path: str, **kwargs) -> None:
shutil.move(src_path, dest_path)
90 changes: 86 additions & 4 deletions ingestion_tools/scripts/enqueue_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from importers.deposition import DepositionImporter
from importers.run import RunImporter
from importers.utils import IMPORTERS
from migrate_folder_structure import migrate_common_options
from standardize_dirs import common_options as ingest_common_options

from common.config import DepositionImportConfig
from common.finders import DestinationFilteredMetadataFinder
from common.fs import FileSystemApi

logger = logging.getLogger("db_import")
Expand Down Expand Up @@ -124,10 +126,7 @@ def run_job(
}

session = Session(region_name=aws_region)
client = session.client(
service_name="stepfunctions",
)

client = session.client(service_name="stepfunctions")
return client.start_execution(
stateMachineArn=state_machine_arn,
name=execution_name,
Expand Down Expand Up @@ -590,5 +589,88 @@ def sync(
)


@cli.command()
@click.argument("config_file", required=True, type=str)
@click.argument("output_bucket", required=True, type=str)
@click.option("--migrate-everything", is_flag=True, default=False)
@click.option(
"--swipe-wdl-key",
type=str,
required=True,
default="schema_migrate-v0.0.1.wdl",
help="Specify wdl key for custom workload",
)
@enqueue_common_options
@migrate_common_options
@click.pass_context
def migrate(
ctx,
config_file: str,
output_bucket: str,
migrate_everything: bool,
swipe_wdl_key: str,
**kwargs,
):
handle_common_options(ctx, kwargs)
fs_mode = "s3"
fs = FileSystemApi.get_fs_api(mode=fs_mode, force_overwrite=False)

config = DepositionImportConfig(fs, config_file, output_bucket, output_bucket, IMPORTERS)
# config.load_map_files()
filter_runs = [re.compile(pattern) for pattern in kwargs.get("filter_run_name", [])]
exclude_runs = [re.compile(pattern) for pattern in kwargs.get("exclude_run_name", [])]
filter_datasets = [re.compile(pattern) for pattern in kwargs.get("filter_dataset_name", [])]
exclude_datasets = [re.compile(pattern) for pattern in kwargs.get("exclude_dataset_name", [])]

# Always iterate over depostions, datasets and runs.
for dataset in DatasetImporter.finder(config):
if list(filter(lambda x: x.match(dataset.name), exclude_datasets)):
print(f"Excluding {dataset.name}..")
continue
if filter_datasets and not list(filter(lambda x: x.match(dataset.name), filter_datasets)):
print(f"Skipping {dataset.name}..")
continue
print(f"Processing dataset: {dataset.name}")

futures = []
with ProcessPoolExecutor(max_workers=ctx.obj["parallelism"]) as workerpool:
finder = DestinationFilteredMetadataFinder([], RunImporter)
for result_path in finder.find(config, {"dataset_name": dataset.name, "run_name": "*"}):
run_name = os.path.basename(os.path.dirname(result_path))
if list(filter(lambda x: x.match(run_name), exclude_runs)):
print(f"Excluding {run_name}..")
continue
if filter_runs and not list(filter(lambda x: x.match(run_name), filter_runs)):
print(f"Skipping {run_name}..")
continue
print(f"Processing {run_name}...")

per_run_args = {}
excluded_args = ["filter_dataset_name", "filter_run_name"]
for k, v in kwargs.items():
if any(substring in k for substring in excluded_args):
break
per_run_args[k] = v
new_args = to_args(migrate_everything=migrate_everything, **per_run_args) # make a copy
new_args.append(f"--filter-dataset-name '^{dataset.name}$'")
new_args.append(f"--filter-run-name '^{run_name}$'")

dataset_id = dataset.name
execution_name = f"{int(time.time())}-migrate-ds{dataset_id}-run{run_name}"

# execution name greater than 80 chars causes boto ValidationException
if len(execution_name) > 80:
execution_name = execution_name[-80:]

wdl_args = {"config_file": config_file, "output_bucket": output_bucket, "flags": " ".join(new_args)}

futures.append(
workerpool.submit(
partial(run_job, execution_name, wdl_args, swipe_wdl_key=swipe_wdl_key, **ctx.obj),
),
)
wait(futures)


if __name__ == "__main__":
cli()
4 changes: 3 additions & 1 deletion ingestion_tools/scripts/importers/tomogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
path: str,
allow_imports: bool,
parents: dict[str, Any],
alignment_metadata_path: str = None,
):
super().__init__(
config=config,
Expand All @@ -77,7 +78,8 @@ def __init__(
parents=parents,
allow_imports=allow_imports,
)
self.alignment_metadata_path = config.to_formatted_path(self.get_alignment_metadata_path())

self.alignment_metadata_path = config.to_formatted_path(alignment_metadata_path or self.get_alignment_metadata_path())
self.identifier = TomogramIdentifierHelper.get_identifier(
config,
self.get_base_metadata(),
Expand Down
1 change: 1 addition & 0 deletions ingestion_tools/scripts/importers/visualization_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def import_item(self) -> None:
if not self.is_import_allowed():
print(f"Skipping import of {self.name}")
return

tomogram = self.get_tomogram()
if not tomogram.get_base_metadata().get("is_visualization_default"):
print("Skipping import for tomogram that is not configured for default_visualization")
Expand Down
Loading

0 comments on commit d061960

Please sign in to comment.