-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[python] Append-mode pre-check logic [WIP]
- Loading branch information
Showing
2 changed files
with
395 additions
and
0 deletions.
There are no files selected for viewing
305 changes: 305 additions & 0 deletions
305
apis/python/src/tiledbsoma/io/registration/signatures.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,305 @@ | ||
import json | ||
from dataclasses import dataclass | ||
from typing import Dict, Optional, Tuple | ||
|
||
import anndata as ad | ||
import pandas as pd | ||
import pyarrow as pa | ||
from typing_extensions import Self | ||
|
||
import tiledbsoma | ||
import tiledbsoma.logging | ||
from tiledbsoma._arrow_types import df_to_arrow | ||
|
||
|
||
def _string_dict_from_arrow_schema(schema: pa.Schema) -> Dict[str, str]: | ||
""" | ||
Converts an Arrow schema to a string/string dict, which is easier on the eyes, | ||
easier to convert from/to JSON for distributed logging, and easier to do del-key on. | ||
""" | ||
|
||
def stringify_type(t: pa.DataType) -> str: | ||
retval = str(t) | ||
# As noted in the Signature class, we pre-check logic from the ingestor. | ||
# As detailed elsewhere, Arrow string and large_string must map to TileDB | ||
# string, which is large-only. Thus string and large_string form an equivalence | ||
# class. Similarly for Arrow binary and large_binary. | ||
if retval == "large_string": | ||
return "string" | ||
if retval == "large_binary": | ||
return "binary" | ||
return retval | ||
|
||
retval = {name: stringify_type(schema.field(name).type) for name in schema.names} | ||
|
||
# The soma_joinid field is specific to SOMA data but does not exist in AnnData/H5AD. When we | ||
# pre-check an AnnData/H5AD input to see if it's appendable to an existing SOMA experiment, we | ||
# must not punish the AnnData/H5AD input for it not having a soma_joinid column in its obs and | ||
# var. | ||
if "soma_joinid" in retval: | ||
del retval["soma_joinid"] | ||
return retval | ||
|
||
|
||
def _string_dict_from_pandas_dataframe( | ||
df: pd.DataFrame, | ||
default_index_name: str, | ||
) -> Dict[str, str]: | ||
""" | ||
Here we provide compatiblity with the ingestor. | ||
SOMA experiments are indexed by int64 soma_joinid and this is SOMA-only. | ||
AnnData inputs have a column offered as the index. This can be: named explicitly "obs_id", | ||
"var_id", etc.; unnamed: adata.obs.index.name is None; named "index". | ||
In the latter two cases the ingestor allows a rename to the user's choice | ||
such as "obs_id" and "var_id". Here in the appender pre-check logic, we | ||
allow the same. | ||
""" | ||
|
||
df = df.head(1) # since reset_index can be expensive on full data | ||
if df.index.name is None or df.index.name == "index": | ||
df.reset_index(inplace=True) | ||
df.rename(columns={"index": default_index_name}, inplace=True) | ||
else: | ||
df.reset_index(inplace=True) | ||
|
||
arrow_table = df_to_arrow(df) | ||
arrow_schema = arrow_table.schema.remove_metadata() | ||
return _string_dict_from_arrow_schema(arrow_schema) | ||
|
||
|
||
@dataclass | ||
class Signature: | ||
""" | ||
This is support for compatibility pre-check for append-mode SOMA ingestion. | ||
If a SOMA experiment already exists and the user wants to append another AnnData/H5AD to it --- | ||
or, if no SOMA experiment exists yet but the user has two or more AnnData/H5AD inputs --- we | ||
provide a fail-fast schema-compatibility check. Use of this pre-check ensures that we flag | ||
non-appendable data before any data writes start. In particular, we avoid having a SOMA | ||
experiment half-appended to. | ||
At present we require that all schemas are identical, both within the ingestor and this | ||
pre-checker. This includes ``obs`` and ``var`` field names and exact dtypes, as well as ``X``, | ||
``obsm``, and ``varm`` dtypes. | ||
Later, we can relax constraints: if say the SOMA experiment has ``float64`` dtype for ``X`` | ||
and an H5AD append has ``float32``, we can do the coercion in the ingestor as well as allowing | ||
it within this pre-checker. Thus, this pre-checker logic will evolve over time as the ingestor | ||
logic evolves over time. | ||
""" | ||
|
||
# Note: string/string dicts are easier to serialize/deserialize than pa.Schema | ||
obs_schema: Dict[str, str] | ||
var_schema: Dict[str, str] | ||
raw_var_schema: Optional[Dict[str, str]] | ||
|
||
# TODO include 'raw' in X_dtypes or no? Different for AnnData and for SOMA. When in doubt, | ||
# lean SOMA. | ||
X_dtypes: Dict[str, str] | ||
raw_X_dtype: Optional[str] | ||
|
||
obsm_dtypes: Dict[str, str] | ||
varm_dtypes: Dict[str, str] | ||
|
||
@classmethod | ||
def fromAnnData( | ||
cls, | ||
adata: ad.AnnData, | ||
*, | ||
default_obs_field_name: str = "obs_id", | ||
default_var_field_name: str = "var_id", | ||
default_X_layer_name: str = "data", | ||
) -> Self: | ||
""" | ||
Constructs a pre-check signature from AnnData/H5AD input, which can be compared | ||
against another signature from AnnData/H5AD or SOMA experiment. | ||
AnnData inputs have a column offered as the index. This can be: named explicitly "obs_id", | ||
"var_id", etc.; unnamed: adata.obs.index.name is None; named "index". | ||
In the latter two cases the ingestor allows a rename to the user's choice such as "obs_id" | ||
and "var_id". Here in the appender pre-check logic, we allow the same. | ||
""" | ||
|
||
obs_schema = _string_dict_from_pandas_dataframe( | ||
adata.obs, default_obs_field_name | ||
) | ||
var_schema = _string_dict_from_pandas_dataframe( | ||
adata.var, default_var_field_name | ||
) | ||
|
||
X_dtypes = {} | ||
X_dtypes[default_X_layer_name] = str( | ||
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(adata.X.dtype) | ||
) | ||
for X_layer_name, X_layer in adata.layers.items(): | ||
X_dtypes[X_layer_name] = str( | ||
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(X_layer.dtype) | ||
) | ||
|
||
raw_X_dtype = None | ||
raw_var_schema = None | ||
if adata.raw is not None: | ||
raw_X_dtype = str( | ||
tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(adata.raw.X.dtype) | ||
) | ||
raw_var_schema = _string_dict_from_pandas_dataframe( | ||
adata.raw.var, default_var_field_name | ||
) | ||
|
||
obsm_dtypes = { | ||
k: str(tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(v.dtype)) | ||
for k, v in adata.obsm.items() | ||
} | ||
varm_dtypes = { | ||
k: str(tiledbsoma._arrow_types.arrow_type_from_tiledb_dtype(v.dtype)) | ||
for k, v in adata.varm.items() | ||
} | ||
|
||
return cls( | ||
obs_schema=obs_schema, | ||
var_schema=var_schema, | ||
X_dtypes=X_dtypes, | ||
raw_X_dtype=raw_X_dtype, | ||
raw_var_schema=raw_var_schema, | ||
obsm_dtypes=obsm_dtypes, | ||
varm_dtypes=varm_dtypes, | ||
) | ||
|
||
@classmethod | ||
def fromH5AD( | ||
cls, | ||
h5ad_file_name: str, | ||
*, | ||
default_obs_field_name: str = "obs_id", | ||
default_var_field_name: str = "var_id", | ||
default_X_layer_name: str = "data", | ||
) -> Self: | ||
""" | ||
See ``fromAnnData``. | ||
""" | ||
adata = ad.read_h5ad(h5ad_file_name, "r") | ||
return cls.fromAnnData(adata, default_X_layer_name=default_X_layer_name) | ||
|
||
@classmethod | ||
def fromSOMAExperiment(cls, uri: str, measurement_name: str = "RNA") -> Self: | ||
""" | ||
Constructs a pre-check signature from a SOMA experiment, which can be compared against | ||
another signature from AnnData/H5AD or SOMA experiment. | ||
""" | ||
|
||
with tiledbsoma.Experiment.open(uri) as exp: | ||
|
||
obs_schema = _string_dict_from_arrow_schema(exp.obs.schema) | ||
|
||
var_schema = _string_dict_from_arrow_schema( | ||
exp.ms[measurement_name].var.schema | ||
) | ||
|
||
X_dtypes = {} | ||
for X_layer_name in exp.ms[measurement_name].X.keys(): | ||
X = exp.ms[measurement_name].X[X_layer_name] | ||
X_dtypes[X_layer_name] = str(X.schema.field("soma_data").type) | ||
|
||
raw_X_dtype = None | ||
raw_var_schema = None | ||
if "raw" in exp.ms: | ||
raw_var_schema = _string_dict_from_arrow_schema( | ||
exp.ms["raw"].var.schema | ||
) | ||
|
||
X = exp.ms["raw"].X[X_layer_name] | ||
raw_X_dtype = str(X.schema.field("soma_data").type) | ||
|
||
obsm_dtypes: Dict[str, str] = {} | ||
obsm_dtypes = {} | ||
if "obsm" in exp.ms[measurement_name]: | ||
for obsm_layer_name in exp.ms[measurement_name].obsm.keys(): | ||
obsm = exp.ms[measurement_name].obsm[obsm_layer_name] | ||
obsm_dtypes[obsm_layer_name] = str( | ||
obsm.schema.field("soma_data").type | ||
) | ||
|
||
varm_dtypes: Dict[str, str] = {} | ||
if "varm" in exp.ms[measurement_name]: | ||
for varm_layer_name in exp.ms[measurement_name].varm.keys(): | ||
varm = exp.ms[measurement_name].varm[varm_layer_name] | ||
varm_dtypes[varm_layer_name] = str( | ||
varm.schema.field("soma_data").type | ||
) | ||
|
||
return cls( | ||
obs_schema=obs_schema, | ||
var_schema=var_schema, | ||
X_dtypes=X_dtypes, | ||
raw_X_dtype=raw_X_dtype, | ||
raw_var_schema=raw_var_schema, | ||
obsm_dtypes=obsm_dtypes, | ||
varm_dtypes=varm_dtypes, | ||
) | ||
|
||
@classmethod | ||
def compatible(cls, signatures: Dict[str, Self]) -> Tuple[bool, Optional[str]]: | ||
""" | ||
Determines if two signatures from SOMA experiment or AnnData/H5AD will be safe from | ||
schema-incompatibility at ingestion time. On success, the second argument is None; on | ||
failure, the second argument can be used for reporting to the user. | ||
""" | ||
if len(signatures) < 2: | ||
return (True, None) | ||
names = list(signatures.keys()) | ||
first_name = names[0] | ||
for other_name in names[1:]: | ||
siga = signatures[first_name] | ||
sigb = signatures[other_name] | ||
if not siga.compatibleWith(sigb): | ||
msg = f"Incompatible signatures {first_name!r}, {other_name!r}:\n{siga.toJSON()}\n{sigb.toJSON()}" | ||
return (False, msg) | ||
return (True, None) | ||
|
||
def compatibleWith(self, other: Self) -> bool: | ||
""" | ||
Pairwise helper method for ``compatible``. Reasons for incompatibility are currently advised | ||
to be handled a level up by simply showing the user the failed signature pair. | ||
""" | ||
|
||
if self.obs_schema != other.obs_schema: | ||
return False | ||
|
||
if self.var_schema != other.var_schema: | ||
return False | ||
|
||
if self.X_dtypes != other.X_dtypes: | ||
return False | ||
|
||
if self.raw_X_dtype != other.raw_X_dtype: | ||
return False | ||
if self.raw_var_schema != other.raw_var_schema: | ||
return False | ||
|
||
if self.obsm_dtypes != other.obsm_dtypes: | ||
return False | ||
if self.varm_dtypes != other.varm_dtypes: | ||
return False | ||
|
||
return True | ||
|
||
def toJSON(self) -> str: | ||
"""Presents a signature as JSON which is suitable for distributed logging.""" | ||
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) | ||
|
||
@classmethod | ||
def fromJSON(cls, s: str) -> Self: | ||
dikt = json.loads(s) | ||
return cls( | ||
dikt["obs_schema"], | ||
dikt["var_schema"], | ||
dikt["raw_var_schema"], | ||
dikt["X_dtypes"], | ||
dikt["raw_X_dtype"], | ||
dikt["obsm_dtypes"], | ||
dikt["varm_dtypes"], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import tempfile | ||
from pathlib import Path | ||
|
||
import anndata as ad | ||
import pytest | ||
|
||
import tiledbsoma.io | ||
import tiledbsoma.io.registration.signatures as signatures | ||
|
||
HERE = Path(__file__).parent | ||
|
||
|
||
@pytest.fixture | ||
def canned_h5ad_file(request): | ||
input_path = HERE.parent / "testdata/pbmc-small.h5ad" | ||
return input_path | ||
|
||
|
||
@pytest.fixture | ||
def canned_anndata(canned_h5ad_file): | ||
return ad.read_h5ad(canned_h5ad_file) | ||
|
||
|
||
def test_signature_serdes(canned_h5ad_file, canned_anndata): | ||
sig = signatures.Signature.fromH5AD(canned_h5ad_file.as_posix()) | ||
text1 = sig.toJSON() | ||
assert "obs_schema" in text1 | ||
assert "var_schema" in text1 | ||
assert sig == signatures.Signature.fromJSON(text1) | ||
|
||
sig = signatures.Signature.fromAnnData(canned_anndata) | ||
text2 = sig.toJSON() | ||
assert sig == signatures.Signature.fromJSON(text2) | ||
|
||
assert text1 == text2 | ||
|
||
tempdir = tempfile.TemporaryDirectory() | ||
output_path = tempdir.name | ||
uri = tiledbsoma.io.from_anndata(output_path, canned_anndata, "RNA") | ||
sig = signatures.Signature.fromSOMAExperiment(uri) | ||
text3 = sig.toJSON() | ||
assert sig == signatures.Signature.fromJSON(text3) | ||
|
||
assert text1 == text3 | ||
|
||
|
||
def test_compatible(canned_anndata): | ||
|
||
# Check that zero inputs result in zero incompatibility | ||
ok, msg = signatures.Signature.compatible({}) | ||
assert ok | ||
assert msg is None | ||
|
||
sig1 = signatures.Signature.fromAnnData(canned_anndata) | ||
|
||
tempdir = tempfile.TemporaryDirectory() | ||
output_path = tempdir.name | ||
uri = tiledbsoma.io.from_anndata(output_path, canned_anndata, "RNA") | ||
sig2 = signatures.Signature.fromSOMAExperiment(uri) | ||
|
||
# Check that single inputs result in zero incompatibility | ||
ok, msg = signatures.Signature.compatible({"anndata": sig1}) | ||
assert ok | ||
assert msg is None | ||
|
||
ok, msg = signatures.Signature.compatible({"experiment": sig2}) | ||
assert ok | ||
assert msg is None | ||
|
||
# Check that AnnData/H5AD is compatible with itself; likewise with SOMA Experiment | ||
ok, msg = signatures.Signature.compatible({"anndata": sig1, "same": sig1}) | ||
assert ok | ||
assert msg is None | ||
|
||
ok, msg = signatures.Signature.compatible({"experiment": sig2, "same": sig2}) | ||
assert ok | ||
assert msg is None | ||
|
||
# Check compatibility of identical AnnData / SOMA experiment. | ||
ok, msg = signatures.Signature.compatible({"anndata": sig1, "experiment": sig2}) | ||
assert ok | ||
assert msg is None | ||
|
||
# Check incompatibility of modified AnnData | ||
adata3 = canned_anndata | ||
del adata3.obs["groups"] | ||
sig3 = signatures.Signature.fromAnnData(adata3) | ||
ok, msg = signatures.Signature.compatible({"orig": sig1, "anndata3": sig3}) | ||
assert not ok | ||
assert msg is not None |