Skip to content

Commit

Permalink
Pandas Pyarrow Backend Bugfix and Tests (#3152)
Browse files Browse the repository at this point in the history
* add more test coverage & fixes to pyarrow

* formatting fixes

* clang-tidy

* clang fix

* add missing GIL acquire
  • Loading branch information
mxwli committed Mar 27, 2024
1 parent 9ea80ec commit 73ed1ea
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 24 deletions.
12 changes: 8 additions & 4 deletions src/common/arrow/arrow_array_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,20 @@ static void scanArrowArrayVarList(const ArrowSchema* schema, const ArrowArray* a
uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
uint64_t auxDstPosition = 0;
for (uint64_t i = 0; i < count; i++) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
if (!mask->isNull(i)) {
auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
if (i == 0) {
auxDstPosition = newEntry.offset;
}
}
}
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), offsets[0], 0u, offsets[count] - offsets[0]);
mask->getChild(0), offsets[0], auxDstPosition, offsets[count] - offsets[0]);
}

template<typename offsetsT>
Expand Down Expand Up @@ -259,8 +263,8 @@ static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArra
// eg. nulling out unselected children
for (int8_t i = 0; i < array->n_children; i++) {
ArrowConverter::fromArrowArray(schema->children[i], array->children[i],
*UnionVector::getValVector(&outputVector, i), mask->getChild(i), srcOffset, dstOffset,
count);
*StructVector::getFieldVector(&outputVector, i), mask->getChild(i), srcOffset,
dstOffset, count);
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/common/arrow/arrow_null_mask_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void ArrowNullMaskTree::scanListPushDown(
NullMask pushDownMask((auxiliaryLength + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >>
NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2);
for (uint64_t i = 0; i < count; i++) {
pushDownMask.setNullFromRange(offsets[i], offsets[i + 1] - offsets[i], isNull(i));
pushDownMask.setNullFromRange(
offsets[i] - offsets[0], offsets[i + 1] - offsets[i], isNull(i));
}
children->push_back(ArrowNullMaskTree(
schema->children[0], array->children[0], offsets[0], auxiliaryLength, &pushDownMask));
Expand Down Expand Up @@ -155,18 +156,18 @@ ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
lowestOffsets[i], highestOffsets[i] - lowestOffsets[i]));
}
for (auto i = srcOffset; i < srcOffset + count; i++) {
int32_t curOffset = offsets[i];
int8_t curType = types[i];
for (auto i = 0u; i < count; i++) {
int32_t curOffset = offsets[i + srcOffset];
int8_t curType = types[i + srcOffset];
mask->setNull(i, children->operator[](curType).isNull(curOffset));
}
} else {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(
schema->children[i], array->children[i], srcOffset, count));
}
for (auto i = srcOffset; i < srcOffset + count; i++) {
int8_t curType = types[i];
for (auto i = 0u; i < count; i++) {
int8_t curType = types[i + srcOffset];
mask->setNull(i, children->operator[](curType).isNull(i));
// this isn't specified in the arrow specification, but is it valid to
// compute this using a bitwise OR?
Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void ListAuxiliaryBuffer::resizeDataVector(ValueVector* dataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * dataVector->getNumBytesPerValue());
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * dataVector->getNumBytesPerValue());
dataVector->valueBuffer = std::move(buffer);
dataVector->nullMask->resize(capacity);
dataVector->nullMask->resize(capacity); // note: allocating 64 times what is needed
// If the dataVector is a struct vector, we need to resize its field vectors.
if (dataVector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT) {
resizeStructDataVector(dataVector);
Expand Down
11 changes: 4 additions & 7 deletions tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@ struct PyArrowTableScanSharedState final : public function::BaseScanSharedState

struct PyArrowTableScanFunctionData final : public function::TableFuncBindData {
std::shared_ptr<ArrowSchemaWrapper> schema;
std::unique_ptr<py::object> table;
py::object table;
uint64_t numRows;

PyArrowTableScanFunctionData(std::vector<common::LogicalType> columnTypes,
std::shared_ptr<ArrowSchemaWrapper> schema, std::vector<std::string> columnNames,
py::object table, uint64_t numRows)
py::handle table, uint64_t numRows)
: TableFuncBindData{std::move(columnTypes), std::move(columnNames)},
schema{std::move(schema)}, table{std::make_unique<py::object>(table)}, numRows{numRows} {}
schema{std::move(schema)}, table{py::reinterpret_borrow<py::object>(table)}, numRows{numRows} {}

~PyArrowTableScanFunctionData() override {
py::gil_scoped_acquire acquire;
table.reset();
}
~PyArrowTableScanFunctionData() override {}

std::unique_ptr<function::TableFuncBindData> copy() const override {
py::gil_scoped_acquire acquire;
Expand Down
1 change: 1 addition & 0 deletions tools/python_api/src_cpp/pandas/pandas_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ static std::unique_ptr<ScanReplacementData> tryReplacePD(py::dict& dict, py::str
std::unique_ptr<ScanReplacementData> replacePD(const std::string& objectName) {
auto pyTableName = py::str(objectName);
// Here we do an exhaustive search on the frame lineage.
py::gil_scoped_acquire acquire;
auto currentFrame = importCache->inspect.currentframe()();
while (hasattr(currentFrame, "f_locals")) {
auto localDict = py::reinterpret_borrow<py::dict>(currentFrame.attr("f_locals"));
Expand Down
6 changes: 3 additions & 3 deletions tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ static std::unique_ptr<function::TableFuncBindData> bindFunc(
main::ClientContext* /*context*/, TableFuncBindInput* input) {

py::gil_scoped_acquire acquire;
py::object table(py::reinterpret_steal<py::object>(
py::object table(py::reinterpret_borrow<py::object>(
reinterpret_cast<PyObject*>(input->inputs[0].getValue<uint8_t*>())));
if (py::isinstance(table, importCache->pandas.DataFrame())) {
table = importCache->pyarrow.lib.Table.from_pandas()(table);
Expand All @@ -29,7 +29,7 @@ static std::unique_ptr<function::TableFuncBindData> bindFunc(
auto numRows = py::len(table);
auto schema = Pyarrow::bind(table, returnTypes, names);
return std::make_unique<PyArrowTableScanFunctionData>(
std::move(returnTypes), std::move(schema), std::move(names), table, numRows);
std::move(returnTypes), std::move(schema), std::move(names), std::move(table), numRows);
}

ArrowArrayWrapper* PyArrowTableScanSharedState::getNextChunk() {
Expand All @@ -46,7 +46,7 @@ static std::unique_ptr<function::TableFuncSharedState> initSharedState(
py::gil_scoped_acquire acquire;
PyArrowTableScanFunctionData* bindData =
dynamic_cast<PyArrowTableScanFunctionData*>(input.bindData);
py::list batches = bindData->table->attr("to_batches")(DEFAULT_VECTOR_CAPACITY);
py::list batches = bindData->table.attr("to_batches")(DEFAULT_VECTOR_CAPACITY);
std::vector<std::shared_ptr<ArrowArrayWrapper>> arrowArrayBatches;

for (auto& i : batches) {
Expand Down
87 changes: 84 additions & 3 deletions tools/python_api/test/test_df_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def generate_primitive(dtype):
if (dtype.startswith("bool")):
return random.randrange(0, 1) == 1
if (dtype.startswith("int32")):
return random.randrange(-2147483648, 2147483648)
return random.randrange(-2147483648, 2147483647)
if (dtype.startswith("int64")):
return random.randrange(-9223372036854775808, 9223372036854775808)
return random.randrange(-9223372036854775808, 9223372036854775807)
if (dtype.startswith("uint64")):
return random.randrange(0, 18446744073709551616)
return random.randrange(0, 18446744073709551615)
if (dtype.startswith("float32")):
random_bits = random.getrandbits(32)
random_bytes = struct.pack('<I', random_bits)
Expand Down Expand Up @@ -232,3 +232,84 @@ def test_pyarrow_dict(conn_db_readonly : ConnDB) -> None:
for colname in ['col1', 'col2']:
for expected, actual in zip(df[colname], result[colname]):
assert expected == actual

def test_pyarrow_list(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 50
childlength = 5
index = pa.array(range(datalength))
col1 = pa.array(
[[generate_primitive('int32[pyarrow]') for x in range(random.randint(1, childlength))] if random.randint(0, 5) == 0 else None for i in range(datalength)])
col2 = pa.array(
[[[generate_primitive('int32[pyarrow]') for x in range(random.randint(1, childlength))] for y in range(1, childlength)] if random.randint(0, 5) == 0 else None for i in range(datalength)])
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1),
'col2': arrowtopd(col2)
})
result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
proc = [idx, col1[idx].as_py(), col2[idx].as_py()]
assert proc == nxt
idx += 1

assert idx == len(index)

def test_pyarrow_struct(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 4096
index = pa.array(range(datalength))
col1_plaindata = [{
'a': generate_primitive('int32[pyarrow]'),
'b': { 'c': generate_string(10) } if random.randint(0, 5) != 0 else None
} if random.randint(0, 5) != 0 else None for i in range(datalength)]
col1 = pa.array(col1_plaindata, pa.struct([
('a', pa.int32()),
('b', pa.struct([('c', pa.string())]))
]))
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1)
})
result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
expected = [idx, col1[idx].as_py()]
assert expected == nxt
idx += 1

assert idx == len(index)

def test_pyarrow_union(conn_db_readonly : ConnDB) -> None:
pytest.skip("unions are not very well supported by kuzu in general")
conn, db = conn_db_readonly
random.seed(100)
datalength = 4096
index = pa.array(range(datalength))
type_codes = pa.array([random.randint(0, 2) for i in range(datalength)], type=pa.int8())
arr1 = pa.array([generate_primitive('int32[pyarrow]') for i in range(datalength)], type=pa.int32())
arr2 = pa.array([generate_string(random.randint(1, 10)) for i in range(datalength)])
arr3 = pa.array([[generate_primitive('float32[pyarrow]') for i in range(10)] for j in range(datalength)])
col1 = pa.UnionArray.from_sparse(type_codes, [arr1, arr2, arr3])
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1)
})
result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
expected = [idx, col1[idx].as_py()]
assert expected == nxt
idx += 1

assert idx == len(index)

0 comments on commit 73ed1ea

Please sign in to comment.