Skip to content

Commit

Permalink
Fix Pyarrow Backend Scanning (#3265)
Browse files Browse the repository at this point in the history
* offset tests

* list offset

* apply list scan fixes

* add struct fixes

* add map tests
  • Loading branch information
mxwli committed Apr 12, 2024
1 parent 02b0e99 commit 919e109
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 13 deletions.
18 changes: 9 additions & 9 deletions src/common/arrow/arrow_array_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ static void scanArrowArrayList(const ArrowSchema* schema, const ArrowArray* arra
uint64_t auxDstPosition = 0;
for (uint64_t i = 0; i < count; i++) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
if (!mask->isNull(i)) {
auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
if (i == 0) {
auxDstPosition = newEntry.offset;
}
// don't check for validity, since we still need to update the offsets
auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
if (i == 0) {
auxDstPosition = newEntry.offset;
}
}
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), offsets[0], auxDstPosition, offsets[count] - offsets[0]);
mask->getChild(0), offsets[0] + array->children[0]->offset, auxDstPosition,
offsets[count] - offsets[0]);
}

template<typename offsetsT>
Expand Down Expand Up @@ -222,8 +222,8 @@ static void scanArrowArrayStruct(const ArrowSchema* schema, const ArrowArray* ar
}
for (int64_t j = 0; j < schema->n_children; j++) {
ArrowConverter::fromArrowArray(schema->children[j], array->children[j],
*StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j), srcOffset,
dstOffset, count);
*StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j),
srcOffset + array->children[j]->offset, dstOffset, count);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/common/arrow/arrow_null_mask_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ void ArrowNullMaskTree::scanListPushDown(const ArrowSchema* schema, const ArrowA
pushDownMask.setNullFromRange(offsets[i] - offsets[0], offsets[i + 1] - offsets[i],
isNull(i));
}
children->push_back(ArrowNullMaskTree(schema->children[0], array->children[0], offsets[0],
auxiliaryLength, &pushDownMask));
children->push_back(ArrowNullMaskTree(schema->children[0], array->children[0],
offsets[0] + array->children[0]->offset, auxiliaryLength, &pushDownMask));
}

void ArrowNullMaskTree::scanStructPushDown(const ArrowSchema* schema, const ArrowArray* array,
uint64_t srcOffset, uint64_t count) {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i], srcOffset,
count, mask.get()));
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
srcOffset + array->children[i]->offset, count, mask.get()));
}
}

Expand Down
103 changes: 103 additions & 0 deletions tools/python_api/test/test_df_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ def test_pyarrow_dict(conn_db_readonly : ConnDB) -> None:
for expected, actual in zip(df[colname], result[colname]):
assert expected == actual

def test_pyarrow_dict_offset(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 4000
index = pa.array(range(datalength), type=pa.int64())
indices = pa.array([random.randint(0, 2) for _ in range(datalength)])
dictionary = pa.array([1, 2, 3, 4])
col1 = pa.DictionaryArray.from_arrays(indices, dictionary.slice(1, 3))
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1)
})
result = conn.execute("LOAD FROM df RETURN * ORDER BY index")
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
proc = [idx, col1[idx].as_py()]
assert proc == nxt
idx += 1

assert idx == len(index)

def test_pyarrow_list(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
Expand All @@ -259,6 +282,31 @@ def test_pyarrow_list(conn_db_readonly : ConnDB) -> None:

assert idx == len(index)

def test_pyarrow_list_offset(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 50
childlength = 5
index = pa.array(range(datalength))
values = pa.array([generate_primitive('int32[pyarrow]') for _ in range(datalength * childlength + 2)])
offsets = pa.array(sorted([random.randint(0, datalength * childlength + 1) for _ in range(datalength + 1)]))
mask = pa.array([random.choice([True, False]) for _ in range(datalength)])
col1 = pa.ListArray.from_arrays(values=values.slice(2, datalength * childlength), offsets=offsets, mask=mask)
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1),
})
result = conn.execute('LOAD FROM df RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
proc = [idx, col1[idx].as_py()]
assert proc == nxt
idx += 1

assert idx == len(index)

def test_pyarrow_struct(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
Expand Down Expand Up @@ -287,6 +335,32 @@ def test_pyarrow_struct(conn_db_readonly : ConnDB) -> None:

assert idx == len(index)

def test_pyarrow_struct_offset(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 4096
index = pa.array(range(datalength))
val1 = pa.array([generate_primitive('int32[pyarrow]') for _ in range(datalength+1)])
val2 = pa.array([generate_primitive('bool[pyarrow]') for _ in range(datalength+2)])
val3 = pa.array([generate_string(random.randint(5, 10)) for _ in range(datalength+3)])
mask = pa.array([random.choice([True, False]) for _ in range(datalength)])
col1 = pa.StructArray.from_arrays([val1.slice(1, datalength), val2.slice(2, datalength), val3.slice(3, datalength)], names=['a', 'b', 'c'], mask=mask)
df = pd.DataFrame({
'index': arrowtopd(index),
'col1': arrowtopd(col1)
})
result = conn.execute('LOAD FROM df RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
expected = [idx, col1[idx].as_py()]
assert expected == nxt
idx += 1

assert idx == len(index)


def test_pyarrow_union(conn_db_readonly : ConnDB) -> None:
pytest.skip("unions are not very well supported by kuzu in general")
conn, db = conn_db_readonly
Expand Down Expand Up @@ -338,3 +412,32 @@ def test_pyarrow_map(conn_db_readonly : ConnDB) -> None:
idx += 1

assert idx == len(index)

def test_pyarrow_map_offset(conn_db_readonly : ConnDB) -> None:
conn, db = conn_db_readonly
random.seed(100)
datalength = 50
maplength = 5
index = pa.array(range(datalength))
offsets = sorted([random.randint(0, datalength * maplength + 1) for _ in range(datalength + 1)])
offsets[25] = None
offsets = pa.array(offsets, type=pa.int32())
keys = pa.array([random.randint(0, (1<<31)-1) for _ in range(datalength * maplength + 1)])
values = pa.array([generate_primitive('int64[pyarrow]') for _ in range(datalength * maplength + 1)])
_col1 = pa.MapArray.from_arrays(offsets, keys.slice(1, datalength * maplength), values.slice(1, datalength * maplength))
col1 = _col1.slice(2, 48)
df = pd.DataFrame({
'index': arrowtopd(index.slice(0, 48)),
'col1': arrowtopd(col1),
})
result = conn.execute('LOAD FROM df RETURN * ORDER BY index')
idx = 0
while result.has_next():
assert idx < len(index)
nxt = result.get_next()
expected = [idx, None if col1[idx].as_py() is None else dict(col1[idx].as_py())]
assert expected == nxt
idx += 1

assert idx == 48

0 comments on commit 919e109

Please sign in to comment.