Skip to content

Commit

Permalink
[python] [NO MERGE UNTIL 1.14] Remove tiledb-py dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenv committed Aug 12, 2024
1 parent 9e2ae24 commit 767c0b0
Show file tree
Hide file tree
Showing 24 changed files with 303 additions and 325 deletions.
2 changes: 1 addition & 1 deletion apis/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def run(self):
"src/tiledbsoma/common.cc",
"src/tiledbsoma/reindexer.cc",
"src/tiledbsoma/query_condition.cc",
"src/tiledbsoma/vfs.cc",
"src/tiledbsoma/soma_context.cc",
"src/tiledbsoma/soma_array.cc",
"src/tiledbsoma/soma_object.cc",
Expand Down Expand Up @@ -343,7 +344,6 @@ def run(self):
"scipy",
# Note: the somacore version is in .pre-commit-config.yaml too
"somacore==1.0.14",
"tiledb~=0.31.0",
"typing-extensions", # Note "-" even though `import typing_extensions`
],
extras_require={
Expand Down
2 changes: 2 additions & 0 deletions apis/python/src/tiledbsoma/_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Measurement( # type: ignore[misc] # __eq__ false positive
__slots__ = ()
_wrapper_type = _tdb_handles.MeasurementWrapper

_wrapper_type = _tdb_handles.MeasurementWrapper

_subclass_constrained_soma_types = {
"var": ("SOMADataFrame",),
"X": ("SOMACollection",),
Expand Down
5 changes: 4 additions & 1 deletion apis/python/src/tiledbsoma/_tdb_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ def _opener(
timestamp: int,
) -> clib.SOMAArray:
open_mode = clib.OpenMode.read if mode == "r" else clib.OpenMode.write

return cls._ARRAY_WRAPPED_TYPE.open(
uri,
mode=open_mode,
Expand All @@ -361,6 +360,10 @@ def _do_initial_reads(self, reader: RawHandle) -> None:
def schema(self) -> pa.Schema:
return self._handle.schema

@property
def config_options(self) -> clib.PlatformConfig:
return self._handle.config_options

@property
def meta(self) -> "MetadataWrapper":
return self.metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ def from_isolated_h5ad(
experiment, not in append mode, but allowing us to still have the bulk of the ingestor code
to be non-duplicated between non-append mode and append mode.
"""
tiledb_ctx = None if context is None else context.tiledb_ctx
with read_h5ad(h5ad_file_name, mode="r", ctx=tiledb_ctx) as adata:
with read_h5ad(h5ad_file_name, mode="r", ctx=context) as adata:
return cls.from_isolated_anndata(
adata,
measurement_name=measurement_name,
Expand Down Expand Up @@ -434,8 +433,7 @@ def from_h5ad_append_on_experiment(
"""Extends registration data to one more H5AD input file."""
tiledbsoma.logging.logger.info(f"Registration: registering {h5ad_file_name}.")

tiledb_ctx = None if context is None else context.tiledb_ctx
with read_h5ad(h5ad_file_name, mode="r", ctx=tiledb_ctx) as adata:
with read_h5ad(h5ad_file_name, mode="r", ctx=context) as adata:
return cls.from_anndata_append_on_experiment(
adata,
previous,
Expand Down
10 changes: 6 additions & 4 deletions apis/python/src/tiledbsoma/io/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import pyarrow as pa
from anndata._core import file_backing

import tiledb

from .. import pytiledbsoma as clib
from .._exception import SOMAError
from .._types import Path
from ..options import SOMATileDBContext

_pa_type_to_str_fmt = {
pa.string(): "U",
Expand All @@ -42,12 +42,14 @@

@contextmanager
def read_h5ad(
input_path: Path, *, mode: str = "r", ctx: Optional[tiledb.Ctx] = None
input_path: Path, *, mode: str = "r", ctx: Optional[SOMATileDBContext] = None
) -> Iterator[ad.AnnData]:
"""
This lets us ingest H5AD with "r" (backed mode) from S3 URIs.
"""
input_handle = tiledb.VFS(ctx=ctx).open(input_path)
ctx = ctx or SOMATileDBContext()
vfs = clib.VFS(ctx.native_context)
input_handle = clib.VFSFilebuf(vfs).open(str(input_path))
try:
with _hack_patch_anndata():
anndata = ad.read_h5ad(_FSPathWrapper(input_handle, input_path), mode)
Expand Down
67 changes: 25 additions & 42 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
eta,
logging,
)
from .. import pytiledbsoma as clib
from .._arrow_types import df_to_arrow
from .._collection import AnyTileDBCollection, CollectionBase
from .._common_nd_array import NDArray
Expand Down Expand Up @@ -101,7 +100,7 @@
signatures,
)
from ._registration.signatures import OriginalIndexMetadata, _prepare_df_for_ingest
from ._util import get_arrow_str_format, read_h5ad
from ._util import read_h5ad

_NDArr = TypeVar("_NDArr", bound=NDArray)
_TDBO = TypeVar("_TDBO", bound=SOMAObject[RawHandle])
Expand Down Expand Up @@ -359,7 +358,7 @@ def from_h5ad(

logging.log_io(None, f"START READING {input_path}")

with read_h5ad(input_path, mode="r", ctx=context.tiledb_ctx) as anndata:
with read_h5ad(input_path, mode="r", ctx=context) as anndata:
logging.log_io(None, _util.format_elapsed(s, f"FINISH READING {input_path}"))

uri = from_anndata(
Expand Down Expand Up @@ -1491,6 +1490,21 @@ def _update_dataframe(
new_data, default_index_name
)

old_keys = set(old_sig.keys())
new_keys = set(new_sig.keys())
common_keys = old_keys.intersection(new_keys)

msgs = []
for key in common_keys:
old_type = old_sig[key]
new_type = new_sig[key]

if old_type != new_type:
msgs.append(f"{key} type {old_type} != {new_type}")
if msgs:
msg = ", ".join(msgs)
raise ValueError(f"unsupported type updates: {msg}")

with DataFrame.open(
sdf.uri, mode="r", context=context, platform_config=platform_config
) as sdf_r:
Expand Down Expand Up @@ -1522,45 +1536,14 @@ def _update_dataframe(
f"{caller_name}: old data soma_joinid must be [0,{num_old_data}), found {len(jid_diffs)} diffs: {', '.join(jid_diff_strs)}"
)

old_keys = set(old_sig.keys())
new_keys = set(new_sig.keys())
drop_keys = old_keys.difference(new_keys)
add_keys = new_keys.difference(old_keys)
common_keys = old_keys.intersection(new_keys)

msgs = []
for key in common_keys:
old_type = old_sig[key]
new_type = new_sig[key]

if old_type != new_type:
msgs.append(f"{key} type {old_type} != {new_type}")
if msgs:
msg = ", ".join(msgs)
raise ValueError(f"unsupported type updates: {msg}")

arrow_table = df_to_arrow(new_data)
arrow_schema = arrow_table.schema.remove_metadata()

add_attrs = dict()
add_enmrs = dict()
for add_key in add_keys:
# Don't directly use the new dataframe's dtypes. Go through the
# to-Arrow-schema logic, and back, as this recapitulates the original
# schema-creation logic.
atype = arrow_schema.field(add_key).type
if pa.types.is_dictionary(arrow_table.schema.field(add_key).type):
add_attrs[add_key] = get_arrow_str_format(atype.index_type)
add_enmrs[add_key] = (
get_arrow_str_format(atype.value_type),
atype.ordered,
)
else:
add_attrs[add_key] = get_arrow_str_format(atype)

clib._update_dataframe(
sdf.uri, sdf.context.native_context, list(drop_keys), add_attrs, add_enmrs
)
new_data.reset_index(inplace=True)
if default_index_name is not None:
if default_index_name in new_data:
if "index" in new_data:
new_data.drop(columns=["index"], inplace=True)
else:
new_data.rename(columns={"index": default_index_name}, inplace=True)
sdf_r._handle._handle.update(df_to_arrow(new_data).schema)

_write_dataframe(
df_uri=sdf.uri,
Expand Down
80 changes: 9 additions & 71 deletions apis/python/src/tiledbsoma/options/_soma_tiledb_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,17 @@
import functools
import threading
import time
import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Literal, Mapping, Optional, Union

from somacore import ContextBase
from typing_extensions import Self

import tiledb

from .. import pytiledbsoma as clib
from .._general_utilities import assert_version_before
from .._types import OpenTimestamp
from .._util import ms_to_datetime, to_timestamp_ms


def _warn_ctx_deprecation() -> None:
assert_version_before(1, 14)
warnings.warn(
"tiledb_ctx is now deprecated for removal in 1.14. "
"Use tiledb_config instead by passing "
"SOMATileDBContext(tiledb_config=ctx.config().dict()).",
DeprecationWarning,
stacklevel=3,
)


def _default_config(
override: Mapping[str, Union[str, float]]
) -> Dict[str, Union[str, float]]:
Expand All @@ -51,9 +36,9 @@ def _default_config(


@functools.lru_cache(maxsize=None)
def _default_global_ctx() -> tiledb.Ctx:
def _default_global_native_context() -> clib.SOMAContext:
"""Lazily builds a default TileDB Context with the default config."""
return tiledb.Ctx(_default_config({}))
return clib.SOMAContext({k: str(v) for k, v in _default_config({}).items()})


def _maybe_timestamp_ms(input: Optional[OpenTimestamp]) -> Optional[int]:
Expand Down Expand Up @@ -81,7 +66,6 @@ class SOMATileDBContext(ContextBase):

def __init__(
self,
tiledb_ctx: Optional[tiledb.Ctx] = None,
tiledb_config: Optional[Dict[str, Union[str, float]]] = None,
timestamp: Optional[OpenTimestamp] = None,
threadpool: Optional[ThreadPoolExecutor] = None,
Expand Down Expand Up @@ -133,27 +117,12 @@ def __init__(
provided, a new ThreadPoolExecutor will be created with
default settings.
"""
if tiledb_ctx is not None:
_warn_ctx_deprecation()

if tiledb_ctx is not None and tiledb_config is not None:
raise ValueError(
"only one of tiledb_ctx or tiledb_config"
" may be set when constructing a SOMATileDBContext"
)
self._lock = threading.Lock()
"""A lock to ensure single initialization of ``_tiledb_ctx``."""
self._initial_config = (
self._initial_config: Optional[Dict[str, Union[str, float]]] = (
None if tiledb_config is None else _default_config(tiledb_config)
)

"""A dictionary of options to override the default TileDB config.
This includes both the user-provided options and the default options
that we provide to TileDB. If this is unset, then either we were
provided with a TileDB Ctx, or we need to use The Default Global Ctx.
"""
self._tiledb_ctx = tiledb_ctx
"""The TileDB context to use, either provided or lazily constructed."""
self._timestamp_ms = _maybe_timestamp_ms(timestamp)

Expand Down Expand Up @@ -184,25 +153,14 @@ def native_context(self) -> clib.SOMAContext:
"""The C++ SOMAContext for this SOMA context."""
with self._lock:
if self._native_context is None:
cfg = self._internal_tiledb_config()
self._native_context = clib.SOMAContext(
{k: str(v) for k, v in cfg.items()}
)
return self._native_context

@property
def tiledb_ctx(self) -> tiledb.Ctx:
"""The TileDB-Py Context for this SOMA context."""
_warn_ctx_deprecation()

with self._lock:
if self._tiledb_ctx is None:
if self._initial_config is None:
# Special case: we need to use the One Global Default.
self._tiledb_ctx = _default_global_ctx()
self._native_context = _default_global_native_context()
else:
self._tiledb_ctx = tiledb.Ctx(self._initial_config)
return self._tiledb_ctx
cfg = self._internal_tiledb_config()
self._native_context = clib.SOMAContext(
{k: str(v) for k, v in cfg.items()}
)
return self._native_context

@property
def tiledb_config(self) -> Dict[str, Union[str, float]]:
Expand All @@ -228,11 +186,6 @@ def _internal_tiledb_config(self) -> Dict[str, Union[str, float]]:
if self._native_context is not None:
return dict(self._native_context.config())

# We have TileDB Context. Return its actual config.
# TODO This block will be deleted once tiledb_ctx is removed in 1.14
if self._tiledb_ctx is not None:
return dict(self._tiledb_ctx.config())

# Our context has not yet been built.
# We return what will be passed into the context.
return (
Expand All @@ -245,7 +198,6 @@ def replace(
self,
*,
tiledb_config: Optional[Dict[str, Any]] = None,
tiledb_ctx: Optional[tiledb.Ctx] = None,
timestamp: Union[None, OpenTimestamp, _Unset] = _UNSET,
threadpool: Union[None, ThreadPoolExecutor, _Unset] = _UNSET,
) -> Self:
Expand Down Expand Up @@ -277,15 +229,7 @@ def replace(
... tiledb_config={"vfs.s3.region": None})
"""
with self._lock:
if tiledb_ctx is not None:
_warn_ctx_deprecation()

if tiledb_config is not None:
if tiledb_ctx:
raise ValueError(
"Either tiledb_config or tiledb_ctx may be provided"
" to replace(), but not both."
)
new_config = self._internal_tiledb_config()
new_config.update(tiledb_config)
tiledb_config = {k: v for (k, v) in new_config.items() if v is not None}
Expand All @@ -300,7 +244,6 @@ def replace(
assert timestamp is None or isinstance(timestamp, (datetime.datetime, int))
return type(self)(
tiledb_config=tiledb_config,
tiledb_ctx=tiledb_ctx,
timestamp=timestamp,
threadpool=threadpool,
)
Expand All @@ -325,11 +268,6 @@ def _validate_soma_tiledb_context(context: Any) -> SOMATileDBContext:
if context is None:
return SOMATileDBContext()

if isinstance(context, tiledb.Ctx):
raise TypeError(
"context is a tiledb.Ctx, not a SOMATileDBContext -- please wrap it in tiledbsoma.SOMATileDBContext(...)"
)

if not isinstance(context, SOMATileDBContext):
raise TypeError("context is not a SOMATileDBContext")

Expand Down
Loading

0 comments on commit 767c0b0

Please sign in to comment.