diff --git a/src/common/arrow/arrow_array_scan.cpp b/src/common/arrow/arrow_array_scan.cpp index 693c38ef28..475e7ab5b0 100644 --- a/src/common/arrow/arrow_array_scan.cpp +++ b/src/common/arrow/arrow_array_scan.cpp @@ -160,16 +160,20 @@ static void scanArrowArrayVarList(const ArrowSchema* schema, const ArrowArray* a uint64_t count) { auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; mask->copyToValueVector(&outputVector, dstOffset, count); - ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); + uint64_t auxDstPosition = 0; for (uint64_t i = 0; i < count; i++) { auto curOffset = offsets[i], nextOffset = offsets[i + 1]; if (!mask->isNull(i)) { auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset); outputVector.setValue(i + dstOffset, newEntry); + if (i == 0) { + auxDstPosition = newEntry.offset; + } } } + ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer, - mask->getChild(0), offsets[0], 0u, offsets[count] - offsets[0]); + mask->getChild(0), offsets[0], auxDstPosition, offsets[count] - offsets[0]); } template @@ -259,8 +263,8 @@ static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArra // eg. nulling out unselected children for (int8_t i = 0; i < array->n_children; i++) { ArrowConverter::fromArrowArray(schema->children[i], array->children[i], - *UnionVector::getValVector(&outputVector, i), mask->getChild(i), srcOffset, dstOffset, - count); + *StructVector::getFieldVector(&outputVector, i), mask->getChild(i), srcOffset, + dstOffset, count); } } diff --git a/src/common/arrow/arrow_null_mask_tree.cpp b/src/common/arrow/arrow_null_mask_tree.cpp index c8b3033602..ab10be19ae 100644 --- a/src/common/arrow/arrow_null_mask_tree.cpp +++ b/src/common/arrow/arrow_null_mask_tree.cpp @@ -53,7 +53,8 @@ void ArrowNullMaskTree::scanListPushDown( NullMask pushDownMask((auxiliaryLength + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); for (uint64_t i = 0; i < count; i++) { - pushDownMask.setNullFromRange(offsets[i], offsets[i + 1] - offsets[i], isNull(i)); + pushDownMask.setNullFromRange( + offsets[i] - offsets[0], offsets[i + 1] - offsets[i], isNull(i)); } children->push_back(ArrowNullMaskTree( schema->children[0], array->children[0], offsets[0], auxiliaryLength, &pushDownMask)); @@ -155,9 +156,9 @@ ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i], lowestOffsets[i], highestOffsets[i] - lowestOffsets[i])); } - for (auto i = srcOffset; i < srcOffset + count; i++) { - int32_t curOffset = offsets[i]; - int8_t curType = types[i]; + for (auto i = 0u; i < count; i++) { + int32_t curOffset = offsets[i + srcOffset]; + int8_t curType = types[i + srcOffset]; mask->setNull(i, children->operator[](curType).isNull(curOffset)); } } else { @@ -165,8 +166,8 @@ ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray children->push_back(ArrowNullMaskTree( schema->children[i], array->children[i], srcOffset, count)); } - for (auto i = srcOffset; i < srcOffset + count; i++) { - int8_t curType = types[i]; + for (auto i = 0u; i < count; i++) { + int8_t curType = types[i + srcOffset]; mask->setNull(i, children->operator[](curType).isNull(i)); // this isn't specified in the arrow specification, but is it valid to // compute this using a bitwise OR? diff --git a/src/common/vector/auxiliary_buffer.cpp b/src/common/vector/auxiliary_buffer.cpp index 20c4a47fd0..4904a50b27 100644 --- a/src/common/vector/auxiliary_buffer.cpp +++ b/src/common/vector/auxiliary_buffer.cpp @@ -52,7 +52,7 @@ void ListAuxiliaryBuffer::resizeDataVector(ValueVector* dataVector) { auto buffer = std::make_unique(capacity * dataVector->getNumBytesPerValue()); memcpy(buffer.get(), dataVector->valueBuffer.get(), size * dataVector->getNumBytesPerValue()); dataVector->valueBuffer = std::move(buffer); - dataVector->nullMask->resize(capacity); + dataVector->nullMask->resize(capacity); // note: allocating 64 times what is needed // If the dataVector is a struct vector, we need to resize its field vectors. if (dataVector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT) { resizeStructDataVector(dataVector); diff --git a/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h b/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h index 237c8c3825..a1463d39ba 100644 --- a/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h +++ b/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h @@ -30,19 +30,16 @@ struct PyArrowTableScanSharedState final : public function::BaseScanSharedState struct PyArrowTableScanFunctionData final : public function::TableFuncBindData { std::shared_ptr schema; - std::unique_ptr table; + py::object table; uint64_t numRows; PyArrowTableScanFunctionData(std::vector columnTypes, std::shared_ptr schema, std::vector columnNames, - py::object table, uint64_t numRows) + py::handle table, uint64_t numRows) : TableFuncBindData{std::move(columnTypes), std::move(columnNames)}, - schema{std::move(schema)}, table{std::make_unique(table)}, numRows{numRows} {} + schema{std::move(schema)}, table{py::reinterpret_borrow(table)}, numRows{numRows} {} - ~PyArrowTableScanFunctionData() override { - py::gil_scoped_acquire acquire; - table.reset(); - } + ~PyArrowTableScanFunctionData() override {} std::unique_ptr copy() const override { py::gil_scoped_acquire acquire; diff --git a/tools/python_api/src_cpp/pandas/pandas_scan.cpp b/tools/python_api/src_cpp/pandas/pandas_scan.cpp index f432ab2093..054d38711a 100644 --- a/tools/python_api/src_cpp/pandas/pandas_scan.cpp +++ b/tools/python_api/src_cpp/pandas/pandas_scan.cpp @@ -159,6 +159,7 @@ static std::unique_ptr tryReplacePD(py::dict& dict, py::str std::unique_ptr replacePD(const std::string& objectName) { auto pyTableName = py::str(objectName); // Here we do an exhaustive search on the frame lineage. + py::gil_scoped_acquire acquire; auto currentFrame = importCache->inspect.currentframe()(); while (hasattr(currentFrame, "f_locals")) { auto localDict = py::reinterpret_borrow(currentFrame.attr("f_locals")); diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp index e9739ea65a..853be429cc 100644 --- a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp @@ -16,7 +16,7 @@ static std::unique_ptr bindFunc( main::ClientContext* /*context*/, TableFuncBindInput* input) { py::gil_scoped_acquire acquire; - py::object table(py::reinterpret_steal( + py::object table(py::reinterpret_borrow( reinterpret_cast(input->inputs[0].getValue()))); if (py::isinstance(table, importCache->pandas.DataFrame())) { table = importCache->pyarrow.lib.Table.from_pandas()(table); @@ -29,7 +29,7 @@ static std::unique_ptr bindFunc( auto numRows = py::len(table); auto schema = Pyarrow::bind(table, returnTypes, names); return std::make_unique( - std::move(returnTypes), std::move(schema), std::move(names), table, numRows); + std::move(returnTypes), std::move(schema), std::move(names), std::move(table), numRows); } ArrowArrayWrapper* PyArrowTableScanSharedState::getNextChunk() { @@ -46,7 +46,7 @@ static std::unique_ptr initSharedState( py::gil_scoped_acquire acquire; PyArrowTableScanFunctionData* bindData = dynamic_cast(input.bindData); - py::list batches = bindData->table->attr("to_batches")(DEFAULT_VECTOR_CAPACITY); + py::list batches = bindData->table.attr("to_batches")(DEFAULT_VECTOR_CAPACITY); std::vector> arrowArrayBatches; for (auto& i : batches) { diff --git a/tools/python_api/test/test_df_pyarrow.py b/tools/python_api/test/test_df_pyarrow.py index b88db4616b..d5b1a89926 100644 --- a/tools/python_api/test/test_df_pyarrow.py +++ b/tools/python_api/test/test_df_pyarrow.py @@ -14,11 +14,11 @@ def generate_primitive(dtype): if (dtype.startswith("bool")): return random.randrange(0, 1) == 1 if (dtype.startswith("int32")): - return random.randrange(-2147483648, 2147483648) + return random.randrange(-2147483648, 2147483647) if (dtype.startswith("int64")): - return random.randrange(-9223372036854775808, 9223372036854775808) + return random.randrange(-9223372036854775808, 9223372036854775807) if (dtype.startswith("uint64")): - return random.randrange(0, 18446744073709551616) + return random.randrange(0, 18446744073709551615) if (dtype.startswith("float32")): random_bits = random.getrandbits(32) random_bytes = struct.pack(' None: for colname in ['col1', 'col2']: for expected, actual in zip(df[colname], result[colname]): assert expected == actual + +def test_pyarrow_list(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + random.seed(100) + datalength = 50 + childlength = 5 + index = pa.array(range(datalength)) + col1 = pa.array( + [[generate_primitive('int32[pyarrow]') for x in range(random.randint(1, childlength))] if random.randint(0, 5) == 0 else None for i in range(datalength)]) + col2 = pa.array( + [[[generate_primitive('int32[pyarrow]') for x in range(random.randint(1, childlength))] for y in range(1, childlength)] if random.randint(0, 5) == 0 else None for i in range(datalength)]) + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1': arrowtopd(col1), + 'col2': arrowtopd(col2) + }) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index') + idx = 0 + while result.has_next(): + assert idx < len(index) + nxt = result.get_next() + proc = [idx, col1[idx].as_py(), col2[idx].as_py()] + assert proc == nxt + idx += 1 + + assert idx == len(index) + +def test_pyarrow_struct(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + random.seed(100) + datalength = 4096 + index = pa.array(range(datalength)) + col1_plaindata = [{ + 'a': generate_primitive('int32[pyarrow]'), + 'b': { 'c': generate_string(10) } if random.randint(0, 5) != 0 else None + } if random.randint(0, 5) != 0 else None for i in range(datalength)] + col1 = pa.array(col1_plaindata, pa.struct([ + ('a', pa.int32()), + ('b', pa.struct([('c', pa.string())])) + ])) + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1': arrowtopd(col1) + }) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index') + idx = 0 + while result.has_next(): + assert idx < len(index) + nxt = result.get_next() + expected = [idx, col1[idx].as_py()] + assert expected == nxt + idx += 1 + + assert idx == len(index) + +def test_pyarrow_union(conn_db_readonly : ConnDB) -> None: + pytest.skip("unions are not very well supported by kuzu in general") + conn, db = conn_db_readonly + random.seed(100) + datalength = 4096 + index = pa.array(range(datalength)) + type_codes = pa.array([random.randint(0, 2) for i in range(datalength)], type=pa.int8()) + arr1 = pa.array([generate_primitive('int32[pyarrow]') for i in range(datalength)], type=pa.int32()) + arr2 = pa.array([generate_string(random.randint(1, 10)) for i in range(datalength)]) + arr3 = pa.array([[generate_primitive('float32[pyarrow]') for i in range(10)] for j in range(datalength)]) + col1 = pa.UnionArray.from_sparse(type_codes, [arr1, arr2, arr3]) + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1': arrowtopd(col1) + }) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index') + idx = 0 + while result.has_next(): + assert idx < len(index) + nxt = result.get_next() + expected = [idx, col1[idx].as_py()] + assert expected == nxt + idx += 1 + + assert idx == len(index) +