Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Script to migrate the directory structure #311

Merged
merged 61 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
7607584
Updating tomograms
manasaV3 Sep 24, 2024
076c3b5
updates to tomogram processing
manasaV3 Sep 26, 2024
b3ab2d3
updates to tomogram key photo filename
manasaV3 Sep 26, 2024
8aee0ee
updates to tomogram neuroglance filepath
manasaV3 Sep 26, 2024
15b7a40
cleaning up tomogram and related entities
manasaV3 Sep 26, 2024
5125982
Updating tiltseries with identifiers
manasaV3 Sep 26, 2024
f1002b0
Updating alignment for tomogram relations
manasaV3 Sep 26, 2024
c79936b
Adding alignment metadatapath to annotations and tomograms
manasaV3 Sep 30, 2024
74a3247
Merge remote-tracking branch 'origin' into multi_tomo
manasaV3 Oct 1, 2024
a42c297
Merge remote-tracking branch 'origin' into multi_tomo
manasaV3 Oct 1, 2024
c1fc0a2
Updating the paths
manasaV3 Oct 1, 2024
60cd302
Updating the paths
manasaV3 Oct 1, 2024
7d01008
Updating failed tests
manasaV3 Oct 1, 2024
98e9163
Working tests clean up needed
manasaV3 Oct 1, 2024
08f0bed
fix: workaround for docker compose bugs. (#295)
jgadling Oct 1, 2024
b89d2c6
fix: support filtering entities by related object id's (#296)
jgadling Oct 1, 2024
dd5f547
chore: add seed script. (#297)
jgadling Oct 1, 2024
c5770ea
feat: Updates to the alignment entity (#298)
manasaV3 Oct 2, 2024
92d45ec
chore: Documenting the Jensen config generation (#293)
manasaV3 Oct 2, 2024
d3c0007
chore: Update footer of release please PR's. (#299)
jgadling Oct 2, 2024
04bedde
Adding tests
manasaV3 Oct 3, 2024
8f6fe81
Merge remote-tracking branch 'origin' into multi_tomo
manasaV3 Oct 4, 2024
06d3a7c
Merge remote-tracking branch 'origin' into multi_tomo
manasaV3 Oct 4, 2024
807f782
cleaning up the paths
manasaV3 Oct 4, 2024
ca54b91
Cleaning up paths
manasaV3 Oct 4, 2024
25df3a4
fix tests
manasaV3 Oct 7, 2024
84a58d5
Updating tomogram id generation
manasaV3 Oct 7, 2024
a656aad
Updating viz_config generation
manasaV3 Oct 7, 2024
065cf49
Making id directories
manasaV3 Oct 7, 2024
f677d01
Making id directories
manasaV3 Oct 7, 2024
785d9c2
Making annotation id directory
manasaV3 Oct 9, 2024
8791a7d
Cleaning up paths with id
manasaV3 Oct 9, 2024
b0efcb2
Updating neuroglancer precompute for dir structure
manasaV3 Oct 9, 2024
a6314f5
Updating neuroglancer config for dir structure
manasaV3 Oct 9, 2024
c1afde9
Updating raw tilt import to tiltseries_id directory
manasaV3 Oct 9, 2024
c84e3b3
Migration script
manasaV3 Oct 10, 2024
497ff54
Update for tomograms
manasaV3 Oct 10, 2024
a03a091
Adding support for alignments
manasaV3 Oct 10, 2024
1df8ee3
Migrating annotation precompute
manasaV3 Oct 14, 2024
79eae8b
Migrating collection_metadata
manasaV3 Oct 14, 2024
6d021bd
Migrating rawtilt and gains
manasaV3 Oct 14, 2024
b482db2
Clean up
manasaV3 Oct 14, 2024
c4b6e0c
Clean up
manasaV3 Oct 14, 2024
79b293f
Clean up
manasaV3 Oct 14, 2024
5b1ffaf
Enabling the move
manasaV3 Oct 14, 2024
2e62a97
Adding metadata updates
manasaV3 Oct 14, 2024
d0db03c
Uncommenting deletion
manasaV3 Oct 14, 2024
a1a4874
Adding check before move
manasaV3 Oct 14, 2024
14992be
Adding wdl
manasaV3 Oct 14, 2024
86efbd7
minor fixes
manasaV3 Oct 14, 2024
05317e8
Updating enqueue script
manasaV3 Oct 15, 2024
148026b
Merge branch 'main' into mvenkatakrishnan/migration_scripts
manasaV3 Oct 15, 2024
7c87421
Updating annotation file names
manasaV3 Oct 15, 2024
8fb098a
Updating the tomogram key photo path
manasaV3 Oct 15, 2024
e9ef47b
Fixing tests
manasaV3 Oct 15, 2024
f4abad1
Merge branch 'main' into mvenkatakrishnan/migration_scripts
manasaV3 Oct 15, 2024
f540ab7
Merge branch 'main', remote-tracking branch 'origin' into migrate_tomo
manasaV3 Oct 15, 2024
602f345
Fix key_photo migration
manasaV3 Oct 15, 2024
05f22ed
Handling default alignments
manasaV3 Oct 16, 2024
43f0c74
fix path standardization when bucket name not in path
manasaV3 Oct 16, 2024
da168d0
Lint.
jgadling Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions ingestion_tools/infra/schema_migrate-v0.0.1.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
version 1.0


task cryoet_folder_migration_workflow {
input {
String docker_image_id
String aws_region
String config_file
String output_bucket
String flags
}

command <<<
set -euxo pipefail
export PYTHONUNBUFFERED=1
python --version 1>&2
ls -l 1>&2
pwd 1>&2
cd /usr/src/app/ingestion_tools/scripts
set +x
echo python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2
python migrate_folder_structure.py migrate ~{config_file} ~{output_bucket} ~{flags} 1>&2
>>>

runtime {
docker: docker_image_id
}
}

workflow cryoet_folder_migration {
input {
String docker_image_id = "cryoet_data_ingestion:latest"
String aws_region = "us-west-2"
String config_file
String output_bucket
String flags
}

call cryoet_folder_migration_workflow {
input:
docker_image_id = docker_image_id,
aws_region = aws_region,
config_file = config_file,
output_bucket = output_bucket,
flags = flags
}

output {
File log = "output.txt"
}
}
6 changes: 5 additions & 1 deletion ingestion_tools/scripts/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,8 @@ def to_formatted_path(self, path: str) -> str:
"""
Returns the s3 key, without the bucket name, for the given path. Helpful for formatting paths in metadata.
"""
return os.path.relpath(path, self.output_prefix) if path else None
if path:
if path.startswith(self.output_prefix):
return os.path.relpath(path, self.output_prefix)
return path
return None
10 changes: 10 additions & 0 deletions ingestion_tools/scripts/common/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def exists(self, path: str) -> bool:
def read_block(self, path: str, start: int | None = None, end: int | None = None) -> str:
pass

@abstractmethod
def move(self, src_path: str, dest_path: str, **kwargs) -> None:
pass


class S3Filesystem(FileSystemApi):
def __init__(self, force_overwrite: bool, client_kwargs: None | dict[str, str] = None, **kwargs):
Expand Down Expand Up @@ -169,6 +173,9 @@ def read_block(self, path: str, start: int | None = None, end: int | None = None

return local_dest_file

def move(self, src_path: str, dest_path: str, **kwargs) -> None:
self.s3fs.mv(src_path, dest_path, **kwargs)


class LocalFilesystem(FileSystemApi):
def __init__(self, force_overwrite: bool):
Expand Down Expand Up @@ -203,3 +210,6 @@ def push(self, path: str) -> None:

def exists(self, path: str) -> bool:
return os.path.exists(path)

def move(self, src_path: str, dest_path: str, **kwargs) -> None:
shutil.move(src_path, dest_path)
90 changes: 86 additions & 4 deletions ingestion_tools/scripts/enqueue_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from importers.deposition import DepositionImporter
from importers.run import RunImporter
from importers.utils import IMPORTERS
from migrate_folder_structure import migrate_common_options
from standardize_dirs import common_options as ingest_common_options

from common.config import DepositionImportConfig
from common.finders import DestinationFilteredMetadataFinder
from common.fs import FileSystemApi

logger = logging.getLogger("db_import")
Expand Down Expand Up @@ -124,10 +126,7 @@ def run_job(
}

session = Session(region_name=aws_region)
client = session.client(
service_name="stepfunctions",
)

client = session.client(service_name="stepfunctions")
return client.start_execution(
stateMachineArn=state_machine_arn,
name=execution_name,
Expand Down Expand Up @@ -590,5 +589,88 @@ def sync(
)


@cli.command()
@click.argument("config_file", required=True, type=str)
@click.argument("output_bucket", required=True, type=str)
@click.option("--migrate-everything", is_flag=True, default=False)
@click.option(
"--swipe-wdl-key",
type=str,
required=True,
default="schema_migrate-v0.0.1.wdl",
help="Specify wdl key for custom workload",
)
@enqueue_common_options
@migrate_common_options
@click.pass_context
def migrate(
ctx,
config_file: str,
output_bucket: str,
migrate_everything: bool,
swipe_wdl_key: str,
**kwargs,
):
handle_common_options(ctx, kwargs)
fs_mode = "s3"
fs = FileSystemApi.get_fs_api(mode=fs_mode, force_overwrite=False)

config = DepositionImportConfig(fs, config_file, output_bucket, output_bucket, IMPORTERS)
# config.load_map_files()
filter_runs = [re.compile(pattern) for pattern in kwargs.get("filter_run_name", [])]
exclude_runs = [re.compile(pattern) for pattern in kwargs.get("exclude_run_name", [])]
filter_datasets = [re.compile(pattern) for pattern in kwargs.get("filter_dataset_name", [])]
exclude_datasets = [re.compile(pattern) for pattern in kwargs.get("exclude_dataset_name", [])]

# Always iterate over depostions, datasets and runs.
for dataset in DatasetImporter.finder(config):
if list(filter(lambda x: x.match(dataset.name), exclude_datasets)):
print(f"Excluding {dataset.name}..")
continue
if filter_datasets and not list(filter(lambda x: x.match(dataset.name), filter_datasets)):
print(f"Skipping {dataset.name}..")
continue
print(f"Processing dataset: {dataset.name}")

futures = []
with ProcessPoolExecutor(max_workers=ctx.obj["parallelism"]) as workerpool:
finder = DestinationFilteredMetadataFinder([], RunImporter)
for result_path in finder.find(config, {"dataset_name": dataset.name, "run_name": "*"}):
run_name = os.path.basename(os.path.dirname(result_path))
if list(filter(lambda x: x.match(run_name), exclude_runs)):
print(f"Excluding {run_name}..")
continue
if filter_runs and not list(filter(lambda x: x.match(run_name), filter_runs)):
print(f"Skipping {run_name}..")
continue
print(f"Processing {run_name}...")

per_run_args = {}
excluded_args = ["filter_dataset_name", "filter_run_name"]
for k, v in kwargs.items():
if any(substring in k for substring in excluded_args):
break
per_run_args[k] = v
new_args = to_args(migrate_everything=migrate_everything, **per_run_args) # make a copy
new_args.append(f"--filter-dataset-name '^{dataset.name}$'")
new_args.append(f"--filter-run-name '^{run_name}$'")

dataset_id = dataset.name
execution_name = f"{int(time.time())}-migrate-ds{dataset_id}-run{run_name}"

# execution name greater than 80 chars causes boto ValidationException
if len(execution_name) > 80:
execution_name = execution_name[-80:]

wdl_args = {"config_file": config_file, "output_bucket": output_bucket, "flags": " ".join(new_args)}

futures.append(
workerpool.submit(
partial(run_job, execution_name, wdl_args, swipe_wdl_key=swipe_wdl_key, **ctx.obj),
),
)
wait(futures)


if __name__ == "__main__":
cli()
4 changes: 3 additions & 1 deletion ingestion_tools/scripts/importers/tomogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
path: str,
allow_imports: bool,
parents: dict[str, Any],
alignment_metadata_path: str = None,
):
super().__init__(
config=config,
Expand All @@ -71,7 +72,8 @@ def __init__(
parents=parents,
allow_imports=allow_imports,
)
self.alignment_metadata_path = config.to_formatted_path(self.get_alignment_metadata_path())

self.alignment_metadata_path = config.to_formatted_path(alignment_metadata_path or self.get_alignment_metadata_path())
self.identifier = TomogramIdentifierHelper.get_identifier(
config,
self.get_base_metadata(),
Expand Down
1 change: 1 addition & 0 deletions ingestion_tools/scripts/importers/visualization_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def import_item(self) -> None:
if not self.is_import_allowed():
print(f"Skipping import of {self.name}")
return

tomogram = self.get_tomogram()
if not tomogram.get_base_metadata().get("is_visualization_default"):
print("Skipping import for tomogram that is not configured for default_visualization")
Expand Down
Loading
Loading