Skip to content

Commit

Permalink
Update utilities.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgadling committed Oct 17, 2024
1 parent 96e6268 commit c15f28f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 34 deletions.
5 changes: 4 additions & 1 deletion apiv2/db_import/importers/alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ class AlignmentItem(ItemDBImporter):
model_class = models.Alignment

def load_computed_fields(self):
if self.model_args["alignment_method"] == "undefined":
self.model_args["alignment_method"] = None
self.model_args["s3_alignment_metadata"] = self.get_s3_url(self.input_data["file"])
self.model_args["https_alignment_metadata"] = self.get_https_url(self.input_data["file"])
self.model_args["tiltseries_id"] = self.config.get_tiltseries_by_path(self.input_data["tiltseries_path"])
if self.input_data.get("tiltseries_path"):
self.model_args["tiltseries_id"] = self.config.get_tiltseries_by_path(self.input_data["tiltseries_path"])
self.model_args["run_id"] = self.input_data["run"].id


Expand Down
6 changes: 4 additions & 2 deletions apiv2/db_import/importers/tomogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TomogramItem(ItemDBImporter):
}

def normalize_to_unknown_str(self, value: str) -> str:
return value if value else "Unknown"
return value.replace(" ", "_") if value else "Unknown"

def generate_neuroglancer_data(self, path) -> str:
config = self.config.load_key_json(path, is_file_required=False)
Expand All @@ -55,7 +55,9 @@ def load_computed_fields(self):
"s3_mrc_file": self.get_s3_url(self.input_data["mrc_file"]),
"https_mrc_file": self.get_https_url(self.input_data["mrc_file"]),
# TODO: Add alignment_id once we have an alignment importer.
"alignment_id": self.config.get_alignment_by_path(self.get_s3_url(self.input_data["alignment_metadata_path"])),
"alignment_id": self.config.get_alignment_by_path(
self.get_s3_url(self.input_data["alignment_metadata_path"])
),
"key_photo_url": None,
"key_photo_thumbnail_url": None,
"is_portal_standard": self.input_data.get("is_standardized") or False,
Expand Down
40 changes: 40 additions & 0 deletions apiv2/scripts/delete_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copies data from a v1 GraphQL api into a v2 database.
import sqlalchemy as sa
import contextlib
import json
import os
from concurrent.futures import ProcessPoolExecutor, as_completed

import click
import cryoet_data_portal as cdp
import requests
from database import models
from support.enums import tomogram_reconstruction_method_enum as reconstruction_enum

from platformics.database.connect import init_sync_db


@click.command()
@click.argument("dataset_id", required=True, type=int)
@click.option("--i-am-super-sure", help="this argument must be yes", type=str)
@click.option("--db-uri", help="Database URI", type=str)
def delete_dataset(dataset_id: int, i_am_super_sure: str, db_uri: str):
if not db_uri:
db_uri = (
f"postgresql+psycopg://{os.environ['PLATFORMICS_DATABASE_USER']}:{os.environ['PLATFORMICS_DATABASE_PASSWORD']}@{os.environ['PLATFORMICS_DATABASE_HOST']}:{os.environ['PLATFORMICS_DATABASE_PORT']}/{os.environ['PLATFORMICS_DATABASE_NAME']}",
)

print(f"DELETING DATASET {dataset_id}")
if i_am_super_sure != "yes":
print("You must specify '--i-am-super-sure yes' to delete a dataset")
exit(1)
db = init_sync_db(db_uri)
with db.session() as session:
obj = session.scalars(sa.select(models.Dataset).where(models.Dataset.id == dataset_id)).one()
session.delete(obj)
session.flush()
session.commit()


if __name__ == "__main__":
delete_dataset()
87 changes: 56 additions & 31 deletions apiv2/scripts/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
# cleanup:
# delete from deposition_type; delete from annotation_author; delete from annotation_method_link; delete from dataset_author; delete from deposition_author; delete from annotation_file ; delete from annotation_shape ; delete from dataset_funding; delete from tomogram_author; delete from tomogram; delete from annotation; delete from annotation; delete from tomogram_voxel_spacing; delete from per_section_alignment_parameters; delete from alignment; delete from frame; delete from tiltseries; delete from gain_file; delete from frame; delete from run; delete from run; delete from dataset; delete from deposition;

# CLIENT_URL="https://graphql-cryoet-api.cryoet.staging.si.czi.technology/v1/graphql"
CLIENT_URL = "https://graphql.cryoetdataportal.cziscience.com/v1/graphql"


# Adapted from https://github.com/sqlalchemy/sqlalchemy/wiki/UniqueObject
def get_or_create(session, cls, row_id, filters, data):
Expand Down Expand Up @@ -307,7 +304,6 @@ def add(session, model, item, parents):
"reconstruction_software": remote_item["reconstruction_software"],
"is_author_submitted": remote_item["is_canonical"],
"is_portal_standard": False,
"is_standardized": False,
"is_visualization_default": remote_item["is_canonical"],
"s3_omezarr_dir": remote_item["s3_omezarr_dir"],
"https_omezarr_dir": remote_item["https_omezarr_dir"],
Expand Down Expand Up @@ -356,11 +352,9 @@ def add(session, model, item, parents):
return item


def import_deposition(deposition_id: int):
db = init_sync_db(
f"postgresql+psycopg://{os.environ['PLATFORMICS_DATABASE_USER']}:{os.environ['PLATFORMICS_DATABASE_PASSWORD']}@{os.environ['PLATFORMICS_DATABASE_HOST']}:{os.environ['PLATFORMICS_DATABASE_PORT']}/{os.environ['PLATFORMICS_DATABASE_NAME']}",
)
client = cdp.Client(CLIENT_URL)
def db_import_deposition(client_url, db_uri, deposition_id: int):
db = init_sync_db(db_uri)
client = cdp.Client(client_url)
dep = cdp.Deposition.get_by_id(client, deposition_id)
with db.session() as session:
print(f"processing {dep.id}")
Expand All @@ -375,7 +369,7 @@ def import_deposition(deposition_id: int):


# Method links isn't exposed by the v2 client for some reason
def fetch_method_links(annotation):
def fetch_method_links(client_url, annotation):
headers = {
"Content-type": "application/json",
}
Expand All @@ -390,19 +384,17 @@ def fetch_method_links(annotation):
% annotation.id
)
payload = json.dumps({"query": query, "variables": None, "operationName": "MyQuery"})
res = requests.post(CLIENT_URL, headers=headers, data=payload)
res = requests.post(client_url, headers=headers, data=payload)
data = res.json()
anno = data["data"]["annotations"][0]
if anno and anno.get("method_links"):
return anno["method_links"]
return []


def import_dataset(dataset_id: int):
db = init_sync_db(
f"postgresql+psycopg://{os.environ['PLATFORMICS_DATABASE_USER']}:{os.environ['PLATFORMICS_DATABASE_PASSWORD']}@{os.environ['PLATFORMICS_DATABASE_HOST']}:{os.environ['PLATFORMICS_DATABASE_PORT']}/{os.environ['PLATFORMICS_DATABASE_NAME']}",
)
client = cdp.Client(CLIENT_URL)
def db_import_dataset(client_url, db_uri, dataset_id: int):
db = init_sync_db(db_uri)
client = cdp.Client(client_url)
dataset = cdp.Dataset.get_by_id(client, dataset_id)
with db.session() as session:
print(f"processing {dataset_id}")
Expand Down Expand Up @@ -432,7 +424,7 @@ def import_dataset(dataset_id: int):
for anno in cdp.Annotation.find(client, [cdp.Annotation.tomogram_voxel_spacing_id == vs.id]):
a = add(session, models.Annotation, anno, parents)
parents["annotation_id"] = a.id
for methodlink in fetch_method_links(anno):
for methodlink in fetch_method_links(client_url, anno):
add(session, models.AnnotationMethodLink, methodlink, parents)
for annofile in cdp.AnnotationFile.find(client, [cdp.AnnotationFile.annotation_id == anno.id]):
add(session, models.AnnotationFile, annofile, parents)
Expand All @@ -454,32 +446,65 @@ def import_dataset(dataset_id: int):


@click.command()
@click.option("--skip-until", help="skip all datasets until and including this one")
@click.option("--parallelism", help="how many processes to run in parallel", required=True, default=10)
def do_import(skip_until, parallelism):
client = cdp.Client(CLIENT_URL)
# Either prod or staging
@click.argument("env", required=True, type=str)
@click.option("--skip-until", help="skip all datasets until and including this one", type=int)
@click.option("--import-dataset", help="Import specific datasets (multiple ok)", multiple=True, type=int)
@click.option("--import-deposition", help="Import specific depositions (multiple ok)", multiple=True, type=int)
@click.option("--db-uri", help="Database URI")
@click.option(
"--import-all-depositions", is_flag=True, help="Whether to import depositions", required=False, default=False
)
@click.option("--parallelism", help="how many processes to run in parallel", required=True, default=10, type=int)
def do_import(env, import_dataset, import_deposition, db_uri, import_all_depositions, skip_until, parallelism):
futures = []
if env not in ["prod", "staging"]:
print("Env must be 'prod' or 'staging'")
exit(1)
if env == "staging":
client_url = "https://graphql-cryoet-api.cryoet.staging.si.czi.technology/v1/graphql"
else:
client_url = "https://graphql.cryoetdataportal.cziscience.com/v1/graphql"

if not db_uri:
db_uri = (
f"postgresql+psycopg://{os.environ['PLATFORMICS_DATABASE_USER']}:{os.environ['PLATFORMICS_DATABASE_PASSWORD']}@{os.environ['PLATFORMICS_DATABASE_HOST']}:{os.environ['PLATFORMICS_DATABASE_PORT']}/{os.environ['PLATFORMICS_DATABASE_NAME']}",
)

client = cdp.Client(client_url)

with ProcessPoolExecutor(max_workers=parallelism) as workerpool:
depositions = cdp.Deposition.find(client)
depositions.sort(key=lambda a: a.id) # Sort datasets by id
for dep in depositions:
futures.append(
workerpool.submit(
import_deposition,
dep.id,
),
)
if import_all_depositions or import_deposition:
depositions = cdp.Deposition.find(client)
depositions.sort(key=lambda a: a.id) # Sort datasets by id
for dep in depositions:
if import_deposition:
if dep.id not in import_deposition:
continue
futures.append(
workerpool.submit(
db_import_deposition,
client_url,
db_uri,
dep.id,
),
)
datasets = cdp.Dataset.find(client)
datasets.sort(key=lambda a: a.id) # Sort datasets by id
for dataset in datasets:
if skip_until:
if dataset.id == int(skip_until):
skip_until = None
continue
if import_dataset:
if dataset.id not in import_dataset:
continue

futures.append(
workerpool.submit(
import_dataset,
db_import_dataset,
client_url,
db_uri,
dataset.id,
),
)
Expand Down

0 comments on commit c15f28f

Please sign in to comment.