Skip to content

Commit

Permalink
feat: update apiv2 db ingestion to support new s3 schema (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
jgadling authored Oct 21, 2024
1 parent d061960 commit 02441de
Show file tree
Hide file tree
Showing 217 changed files with 4,794 additions and 798 deletions.
49 changes: 49 additions & 0 deletions apiv2/db_import/common/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
import logging
import os
from datetime import datetime
from functools import lru_cache
from pathlib import PurePath
from typing import TYPE_CHECKING, Any

import sqlalchemy as sa
from botocore.exceptions import ClientError
from database import models
from s3fs import S3FileSystem
from sqlalchemy.orm import Session

if TYPE_CHECKING:
Expand All @@ -17,6 +22,7 @@

class DBImportConfig:
s3_client: S3Client
s3fs: S3FileSystem
bucket_name: str
s3_prefix: str
https_prefix: str
Expand All @@ -25,19 +31,44 @@ class DBImportConfig:
def __init__(
self,
s3_client: S3Client,
s3fs: S3FileSystem,
bucket_name: str,
https_prefix: str,
session: Session,
):
self.s3_client = s3_client
self.s3fs = s3fs
self.bucket_name = bucket_name
self.s3_prefix = f"s3://{bucket_name}"
self.https_prefix = https_prefix if https_prefix else "https://files.cryoetdataportal.cziscience.com"
self.session = session
self.deposition_map: dict[int, models.Deposition] = {}

def get_db_session(self) -> Session:
return self.session

def load_deposition_map(self) -> None:
session = self.get_db_session()
for item in session.scalars(sa.select(models.Deposition)).all():
self.deposition_map[item.id] = item

@lru_cache # noqa
def get_alignment_by_path(self, path: str) -> int | None:
session = self.get_db_session()
for item in session.scalars(
sa.select(models.Alignment).where(models.Alignment.s3_alignment_metadata == path),
).all():
return item.id

@lru_cache # noqa
def get_tiltseries_by_path(self, path: str) -> int | None:
session = self.get_db_session()
# '_' is a wildcard character in sql LIKE queries, so we need to escape them!
escaped_path = os.path.dirname(path).replace("_", "\\_")
path = os.path.join(self.s3_prefix, escaped_path, "%")
item = session.scalars(sa.select(models.Tiltseries).where(models.Tiltseries.s3_mrc_file.like(path))).one()
return item.id

def find_subdirs_with_files(self, prefix: str, target_filename: str) -> list[str]:
paginator = self.s3_client.get_paginator("list_objects_v2")
logger.info("looking for prefix %s", prefix)
Expand All @@ -53,6 +84,22 @@ def find_subdirs_with_files(self, prefix: str, target_filename: str) -> list[str
continue
return result

def recursive_glob_s3(self, prefix: str, glob_string: str) -> list[str]:
# Returns path to a matched glob.
s3 = self.s3fs
prefix = prefix.rstrip("/")
path = os.path.join(self.bucket_name, prefix, glob_string)
logger.info("Recursively looking for files in %s", path)
return s3.glob(path)

def recursive_glob_prefix(self, prefix: str, glob_string: str) -> list[str]:
# Returns a prefix that contains a given glob'd path but not the path to the found item itself.
s3 = self.s3fs
prefix = prefix.rstrip("/")
path = os.path.join(self.bucket_name, prefix, glob_string)
logger.info("Recursively looking for files in %s", path)
return [os.path.dirname(item[len(self.bucket_name) + 1 :]) for item in s3.glob(path)]

def glob_s3(self, prefix: str, glob_string: str, is_file: bool = True):
paginator = self.s3_client.get_paginator("list_objects_v2")
if prefix.startswith("s3://"):
Expand All @@ -74,6 +121,8 @@ def load_key_json(self, key: str, is_file_required: bool = True) -> dict[str, An
else it will return None.
"""
try:
if key.startswith(self.bucket_name):
key = key[len(self.bucket_name) + 1 :]
text = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
return json.loads(text["Body"].read())
except ClientError as ex:
Expand Down
111 changes: 106 additions & 5 deletions apiv2/db_import/common/finders.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,66 @@
import json
import logging
import os
import re
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import TYPE_CHECKING, Any

from db_import.common.config import DBImportConfig

from platformics.database.models.base import Base

logger = logging.getLogger("db_import")
logger = logging.getLogger("finders")

if TYPE_CHECKING:
from db_import.importers.base import ItemDBImporter
else:
ItemDBImporter = Any


@lru_cache(maxsize=30) # noqa
def cached_metadata_load(s3fs, key):
data = json.loads(s3fs.cat_file(key))
return data


class ItemFinder(ABC):
@abstractmethod
def __init__(self, config: DBImportConfig, **kwargs):
pass
self.config = config

@abstractmethod
def find(self, item_importer: ItemDBImporter) -> list[ItemDBImporter]:
pass

def strip_bucket_from_path(self, path):
if path.startswith(self.config.bucket_name):
return path[len(self.config.bucket_name) + 1 :]
return path

def recursive_glob(self, prefix: str, target_glob: str) -> list[str]:
s3 = self.config.s3fs
prefix = prefix.rstrip("/")
logger.info("Recursively looking for files in %s/%s", prefix, target_glob)
return s3.glob(os.path.join(prefix, target_glob))

def glob_s3(self, prefix: str, glob_string: str, is_file: bool = True):
s3 = self.config.s3fs
prefix = prefix.rstrip("/")
if glob_string:
prefix = os.path.join(prefix, glob_string)
logger.info("Looking for files in %s", prefix)
for item in s3.glob(prefix):
if is_file and not s3.isfile(item):
continue
yield item

def load_metadata(self, key: str, is_file_required: bool = True) -> dict[str, Any] | None:
"""
Loads file matching the key value as json. If file does not exist, will raise error if is_file_required is True
else it will return None.
"""
return cached_metadata_load(self.config.s3fs, key)


class FileFinder(ItemFinder):
def __init__(self, config: DBImportConfig, path: str, glob: str, match_regex: str | None):
Expand All @@ -36,9 +73,73 @@ def __init__(self, config: DBImportConfig, path: str, glob: str, match_regex: st

def find(self, item_importer: ItemDBImporter, parents: dict[str, Base]) -> list[ItemDBImporter]:
results: list[ItemDBImporter] = []
for file in self.config.glob_s3(self.path, self.glob, is_file=True):
for file in self.glob_s3(self.path, self.glob, is_file=True):
if self.match_regex.match(file):
data = {"file": file}
data = {"file": self.strip_bucket_from_path(file)}
data.update(parents)
results.append(item_importer(self.config, data))
return results


class JsonDataFinder(ItemFinder):
def __init__(
self,
config: DBImportConfig,
path: str,
list_key: str | None = None,
match_key: str | None = None,
match_value: str | None = None,
):
self.config = config
self.path = path
self.list_key = []
if list_key:
self.list_key = list_key
self.match_key = match_key
self.match_value = match_value

def find(self, item_importer: ItemDBImporter, parents: dict[str, Base]) -> list[ItemDBImporter]:
results: list[ItemDBImporter] = []
json_data = self.load_metadata(self.path)
original_json_data = json_data
for key in self.list_key:
try:
json_data = json_data[key]
except KeyError:
return []
if self.match_key and self.match_value:
json_data = [item for item in json_data if item.get(self.match_key) == self.match_value]
for idx, item in enumerate(json_data):
item["file"] = self.path
item["index"] = idx + 1
item["original_data"] = original_json_data
item.update(parents)
results.append(item_importer(self.config, item))
return results


class MetadataFileFinder(ItemFinder):
def __init__(self, config: DBImportConfig, path: str, file_glob: str, list_key: str | None = None):
self.config = config
self.path = path
self.file_glob = file_glob
self.list_key = list_key

def find(self, item_importer: ItemDBImporter, parents: dict[str, Base]) -> list[ItemDBImporter]:
results: list[ItemDBImporter] = []
for file in self.recursive_glob(self.path, self.file_glob):
json_data = self.load_metadata(file)
if self.list_key:
for idx, item in enumerate(json_data[self.list_key]):
data = item
data["file"] = self.strip_bucket_from_path(file)
data["index"] = idx + 1 # Use a 1-based index for humans.
data.update(parents)
results.append(item_importer(self.config, data))
# If we have a list key, we don't want to continue
continue
data = {"file": self.strip_bucket_from_path(file)}
data.update(parents)
data.update(json_data)
results.append(item_importer(self.config, data))
return results
Loading

0 comments on commit 02441de

Please sign in to comment.