diff --git a/cytotable/convert.py b/cytotable/convert.py index af118b92..cf1fe53d 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -35,7 +35,9 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] import pathlib - from cytotable.utils import _duckdb_reader + import duckdb + + from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet source_path = source["source_path"] source_type = str(pathlib.Path(source_path).suffix).lower() @@ -47,14 +49,16 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] else f"sqlite_scan('{source_path}', '{source['table_name']}')" ) - # query top 5 results from table and use pragma_storage_info() to - # gather duckdb interpreted data typing - select_query = f""" + # Query top 5 results from table and use pragma_storage_info() to + # gather duckdb interpreted data typing. We gather 5 values for + # each column to help with type inferences (where smaller sets + # may yield lower data type accuracy for the full table). + select_query = """ /* we create an in-mem table for later use with the pragma_storage_info call as this call only functions with materialized tables and not views or related */ CREATE TABLE column_details AS (SELECT * - FROM {select_source} + FROM &select_source LIMIT 5 ); @@ -64,13 +68,47 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] column_name, segment_type as column_dtype FROM pragma_storage_info('column_details') - /* avoid duplicate entries in the form of VALIDITY segment_types */ WHERE segment_type != 'VALIDITY'; """ - # perform the query and create a list of dictionaries with the column data for table - return _duckdb_reader().execute(select_query).arrow().to_pylist() + # attempt to read the data to parquet from duckdb + # with exception handling to read mixed-type data + # using sqlite3 and special utility function + try: + # isolate using new connection to read data with chunk size + offset + # and export directly to parquet via duckdb (avoiding need to return data to python) + # perform the query and create a list of dictionaries with the column data for table + return ( + _duckdb_reader() + .execute(select_query.replace("&select_source", select_source)) + .arrow() + .to_pylist() + ) + + except duckdb.Error as e: + # if we see a mismatched type error + # run a more nuanced query through sqlite + # to handle the mixed types + if "Mismatch Type Error" in str(e) and source_type == ".sqlite": + arrow_data_tbl = _sqlite_mixed_type_query_to_parquet( + source_path=str(source["source_path"]), + table_name=str(source["table_name"]), + # chunk size is set to 5 as a limit similar + # to above SQL within select_query variable + chunk_size=5, + # offset is set to 0 start at first row + # result from table + offset=0, + ) + return ( + _duckdb_reader() + .execute(select_query.replace("&select_source", "arrow_data_tbl")) + .arrow() + .to_pylist() + ) + else: + raise @python_app @@ -238,6 +276,7 @@ def _source_chunk_to_parquet( import duckdb from cloudpathlib import AnyPath + from pyarrow import parquet from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet @@ -292,13 +331,17 @@ def _source_chunk_to_parquet( "Mismatch Type Error" in str(e) and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite" ): - result_filepath = _sqlite_mixed_type_query_to_parquet( - source_path=str(source["source_path"]), - table_name=str(source["table_name"]), - chunk_size=chunk_size, - offset=offset, - result_filepath=result_filepath, + parquet.write_table( + table=_sqlite_mixed_type_query_to_parquet( + source_path=str(source["source_path"]), + table_name=str(source["table_name"]), + chunk_size=chunk_size, + offset=offset, + ), + where=result_filepath, ) + else: + raise # return the filepath for the chunked output file return result_filepath diff --git a/cytotable/utils.py b/cytotable/utils.py index d13286a3..324b780c 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -162,12 +162,11 @@ def _sqlite_mixed_type_query_to_parquet( table_name: str, chunk_size: int, offset: int, - result_filepath: str, ) -> str: """ Performs SQLite table data extraction where one or many columns include data values of potentially mismatched type - such that the data may be exported to Arrow and a Parquet file. + such that the data may be exported to Arrow for later use. Args: source_path: str: @@ -178,17 +177,14 @@ def _sqlite_mixed_type_query_to_parquet( Row count to use for chunked output. offset: int: The offset for chunking the data from source. - dest_path: str: - Path to store the output data. Returns: - str: - The resulting filepath for the table exported to parquet. + pyarrow.Table: + The resulting arrow table for the data """ import sqlite3 import pyarrow as pa - import pyarrow.parquet as parquet # open sqlite3 connection with sqlite3.connect(source_path) as conn: @@ -234,14 +230,8 @@ def _sqlite_mixed_type_query_to_parquet( for row in cursor.fetchall() ] - # write results to a parquet file - parquet.write_table( - table=pa.Table.from_pylist(results), - where=result_filepath, - ) - - # return filepath - return result_filepath + # return arrow table with results + return pa.Table.from_pylist(results) def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path: diff --git a/tests/test_convert.py b/tests/test_convert.py index 55497b00..9b42a4b1 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -970,36 +970,58 @@ def test_sqlite_mixed_type_query_to_parquet( # run a more nuanced query through sqlite # to handle the mixed types if "Mismatch Type Error" in str(duckdb_exc): - result = _sqlite_mixed_type_query_to_parquet( - source_path=example_sqlite_mixed_types_database, - table_name=table_name, - chunk_size=2, - offset=0, - result_filepath=result_filepath, + parquet.write_table( + table=_sqlite_mixed_type_query_to_parquet( + source_path=example_sqlite_mixed_types_database, + table_name=table_name, + chunk_size=2, + offset=0, + ), + where=result_filepath, ) # check schema names - assert parquet.read_schema(where=result).names == [ + assert parquet.read_schema(where=result_filepath).names == [ "col_integer", "col_text", "col_blob", "col_real", ] # check schema types - assert parquet.read_schema(where=result).types == [ + assert parquet.read_schema(where=result_filepath).types == [ pa.int64(), pa.string(), pa.binary(), pa.float64(), ] # check the values per column - assert parquet.read_table(source=result).to_pydict() == { + assert parquet.read_table(source=result_filepath).to_pydict() == { "col_integer": [1, None], "col_text": ["sample", "sample"], "col_blob": [b"sample_blob", b"another_blob"], "col_real": [0.5, None], } + # run full convert on mixed type database + result = convert( + source_path=example_sqlite_mixed_types_database, + dest_path=result_filepath, + dest_datatype="parquet", + source_datatype="sqlite", + compartments=[table_name], + join=False, + ) + + # assert that the single table result looks like the following dictionary + assert parquet.read_table( + source=result["Tbl_a.sqlite"][0]["table"][0] + ).to_pydict() == { + "Tbl_a_col_integer": [1, None], + "Tbl_a_col_text": ["sample", "sample"], + "Tbl_a_col_blob": [b"sample_blob", b"another_blob"], + "Tbl_a_col_real": [0.5, None], + } + def test_convert_hte_cellprofiler_csv( get_tempdir: str,