Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problematic to_arrow tests #3257

Merged
merged 13 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ nodejs:
python:
$(call run-cmake-release, -DBUILD_PYTHON=TRUE)

python-debug:
$(call run-cmake-debug, -DBUILD_PYTHON=TRUE)
mxwli marked this conversation as resolved.
Show resolved Hide resolved

rust:
ifeq ($(OS),Windows_NT)
set KUZU_TESTING=1
Expand Down Expand Up @@ -142,7 +145,10 @@ nodejstest: nodejs
cd tools/nodejs_api && npm test

pytest: python
cmake -E env PYTHONPATH=tools/python_api/build python3 -m pytest -v tools/python_api/test
cmake -E env PYTHONPATH=tools/python_api/build python3 -m pytest -vv tools/python_api/test

pytest-debug: python-debug
cmake -E env PYTHONPATH=tools/python_api/build python3 -m pytest -vv tools/python_api/test

rusttest: rust
cd tools/rust_api && cargo test --locked --all-features -- --test-threads=1
Expand Down
44 changes: 41 additions & 3 deletions src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ void ArrowConverter::setArrowFormatForStruct(ArrowSchemaHolder& rootHolder, Arro
}
}

void ArrowConverter::setArrowFormatForUnion(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
const LogicalType& dataType) {
std::string formatStr = "+ud";
child.n_children = (std::int64_t)UnionType::getNumFields(&dataType);
rootHolder.nestedChildren.emplace_back();
rootHolder.nestedChildren.back().resize(child.n_children);
rootHolder.nestedChildrenPtr.emplace_back();
rootHolder.nestedChildrenPtr.back().resize(child.n_children);
for (auto i = 0u; i < child.n_children; i++) {
rootHolder.nestedChildrenPtr.back()[i] = &rootHolder.nestedChildren.back()[i];
}
child.children = &rootHolder.nestedChildrenPtr.back()[0];
for (auto i = 0u; i < child.n_children; i++) {
initializeChild(*child.children[i]);
auto unionFieldType = UnionType::getFieldType(&dataType, i);
auto unionFieldName = UnionType::getFieldName(&dataType, i);
child.children[i]->name = copyName(rootHolder, unionFieldName);
setArrowFormat(rootHolder, *child.children[i], *unionFieldType);
formatStr += (i == 0u ? ":" : ",") + std::to_string(i);
}
child.format = copyName(rootHolder, formatStr);
}

void ArrowConverter::setArrowFormatForInternalID(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
const LogicalType& /*dataType*/) {
child.format = "+s";
Expand All @@ -76,10 +99,10 @@ void ArrowConverter::setArrowFormatForInternalID(ArrowSchemaHolder& rootHolder,
}
child.children = &rootHolder.nestedChildrenPtr.back()[0];
initializeChild(*child.children[0]);
child.children[0]->name = copyName(rootHolder, "table");
child.children[0]->name = copyName(rootHolder, "offset");
setArrowFormat(rootHolder, *child.children[0], *LogicalType::INT64());
initializeChild(*child.children[1]);
child.children[1]->name = copyName(rootHolder, "offset");
child.children[1]->name = copyName(rootHolder, "table");
mxwli marked this conversation as resolved.
Show resolved Hide resolved
setArrowFormat(rootHolder, *child.children[1], *LogicalType::INT64());
}

Expand Down Expand Up @@ -142,7 +165,7 @@ void ArrowConverter::setArrowFormat(ArrowSchemaHolder& rootHolder, ArrowSchema&
child.format = "tsu:";
} break;
case LogicalTypeID::INTERVAL: {
child.format = "tDm";
child.format = "tDu";
} break;
case LogicalTypeID::UUID:
case LogicalTypeID::STRING: {
Expand Down Expand Up @@ -173,6 +196,18 @@ void ArrowConverter::setArrowFormat(ArrowSchemaHolder& rootHolder, ArrowSchema&
child.children[0]->name = "l";
setArrowFormat(rootHolder, **child.children, *ArrayType::getChildType(&dataType));
} break;
case LogicalTypeID::MAP: {
child.format = "+m";
child.n_children = 1;
rootHolder.nestedChildren.emplace_back();
rootHolder.nestedChildren.back().resize(1);
rootHolder.nestedChildrenPtr.emplace_back();
rootHolder.nestedChildrenPtr.back().push_back(&rootHolder.nestedChildren.back()[0]);
initializeChild(rootHolder.nestedChildren.back()[0]);
child.children = &rootHolder.nestedChildrenPtr.back()[0];
child.children[0]->name = "l";
setArrowFormat(rootHolder, **child.children, *ListType::getChildType(&dataType));
} break;
case LogicalTypeID::STRUCT:
case LogicalTypeID::NODE:
case LogicalTypeID::REL:
Expand All @@ -181,6 +216,9 @@ void ArrowConverter::setArrowFormat(ArrowSchemaHolder& rootHolder, ArrowSchema&
case LogicalTypeID::INTERNAL_ID:
setArrowFormatForInternalID(rootHolder, child, dataType);
break;
case LogicalTypeID::UNION:
setArrowFormatForUnion(rootHolder, child, dataType);
break;
default:
KU_UNREACHABLE;
}
Expand Down
135 changes: 132 additions & 3 deletions src/common/arrow/arrow_row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,34 @@ void ArrowRowBatch::templateInitializeVector<LogicalTypeID::ARRAY>(ArrowVector*
vector->childData.push_back(std::move(childVector));
}

template<>
void ArrowRowBatch::templateInitializeVector<LogicalTypeID::MAP>(ArrowVector* vector,
const LogicalType& type, std::int64_t capacity) {
return templateInitializeVector<LogicalTypeID::LIST>(vector, type, capacity);
}

template<>
void ArrowRowBatch::templateInitializeVector<LogicalTypeID::STRUCT>(ArrowVector* vector,
const LogicalType& type, std::int64_t capacity) {
initializeStructVector(vector, type, capacity);
}

template<>
void ArrowRowBatch::templateInitializeVector<LogicalTypeID::UNION>(ArrowVector* vector,
const LogicalType& type, std::int64_t capacity) {
// Interestingly, unions don't have their own validity bitmap
mxwli marked this conversation as resolved.
Show resolved Hide resolved
mxwli marked this conversation as resolved.
Show resolved Hide resolved
// https://arrow.apache.org/docs/format/Columnar.html#union-layout
// Initialize type buffer
vector->data.reserve((capacity) * sizeof(std::uint8_t));
// Initialize offsets buffer
vector->overflow.reserve((capacity) * sizeof(std::int32_t));
// Initialize children
for (auto i = 0u; i < UnionType::getNumFields(&type); i++) {
auto childVector = createVector(*UnionType::getFieldType(&type, i), capacity);
vector->childData.push_back(std::move(childVector));
}
}

void ArrowRowBatch::initializeStructVector(ArrowVector* vector, const LogicalType& type,
std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
Expand Down Expand Up @@ -184,9 +206,15 @@ std::unique_ptr<ArrowVector> ArrowRowBatch::createVector(const LogicalType& type
case LogicalTypeID::ARRAY: {
templateInitializeVector<LogicalTypeID::ARRAY>(result.get(), type, capacity);
} break;
case LogicalTypeID::MAP: {
templateInitializeVector<LogicalTypeID::MAP>(result.get(), type, capacity);
} break;
case LogicalTypeID::STRUCT: {
templateInitializeVector<LogicalTypeID::STRUCT>(result.get(), type, capacity);
} break;
case LogicalTypeID::UNION: {
mxwli marked this conversation as resolved.
Show resolved Hide resolved
templateInitializeVector<LogicalTypeID::UNION>(result.get(), type, capacity);
} break;
case LogicalTypeID::INTERNAL_ID: {
templateInitializeVector<LogicalTypeID::INTERNAL_ID>(result.get(), type, capacity);
} break;
Expand Down Expand Up @@ -239,6 +267,15 @@ void ArrowRowBatch::templateCopyNonNullValue(ArrowVector* vector, const LogicalT
std::memcpy(vector->data.data() + pos * valSize, &value->val, valSize);
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::INTERVAL>(ArrowVector* vector,
const LogicalType& /*type*/, Value* value, std::int64_t pos) {
auto destAddr = (int64_t*)(vector->data.data() + pos * sizeof(std::int64_t));
auto intervalVal = value->val.intervalVal;
*destAddr = intervalVal.micros + intervalVal.days * Interval::MICROS_PER_DAY +
intervalVal.months * Interval::MICROS_PER_MONTH;
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::BOOL>(ArrowVector* vector,
const LogicalType& /*type*/, Value* value, std::int64_t pos) {
Expand Down Expand Up @@ -330,6 +367,12 @@ void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::ARRAY>(ArrowVector*
}
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::MAP>(ArrowVector* vector,
const LogicalType& type, Value* value, std::int64_t pos) {
return templateCopyNonNullValue<LogicalTypeID::LIST>(vector, type, value, pos);
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::STRUCT>(ArrowVector* vector,
const LogicalType& type, Value* value, std::int64_t /*pos*/) {
Expand All @@ -339,6 +382,22 @@ void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::STRUCT>(ArrowVector*
}
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::UNION>(ArrowVector* vector,
const LogicalType& type, Value* value, std::int64_t pos) {
auto typeBuffer = (std::uint8_t*)vector->data.data();
auto offsetsBuffer = (std::int32_t*)vector->overflow.data();
for (auto i = 0u; i < UnionType::getNumFields(&type); i++) {
if (*UnionType::getFieldType(&type, i) == *value->children[0]->dataType) {
typeBuffer[pos] = i;
offsetsBuffer[pos] = vector->childData[i]->numValues;
return appendValue(vector->childData[i].get(), *UnionType::getFieldType(&type, i),
value->children[0].get());
}
}
KU_UNREACHABLE; // We should always be able to find a matching type
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::INTERNAL_ID>(ArrowVector* vector,
const LogicalType& /*type*/, Value* value, std::int64_t /*pos*/) {
Expand Down Expand Up @@ -373,10 +432,14 @@ void ArrowRowBatch::templateCopyNonNullValue<LogicalTypeID::REL>(ArrowVector* ve
RelVal::getSrcNodeIDVal(value));
appendValue(vector->childData[1].get(), *StructType::getFieldTypes(&type)[1],
RelVal::getDstNodeIDVal(value));
std::int64_t propertyId = 2;
auto numProperties = NodeVal::getNumProperties(value);
appendValue(vector->childData[2].get(), *StructType::getFieldTypes(&type)[2],
RelVal::getLabelVal(value));
appendValue(vector->childData[3].get(), *StructType::getFieldTypes(&type)[3],
RelVal::getIDVal(value));
std::int64_t propertyId = 4;
auto numProperties = RelVal::getNumProperties(value);
for (auto i = 0u; i < numProperties; i++) {
auto val = NodeVal::getPropertyVal(value, i);
auto val = RelVal::getPropertyVal(value, i);
appendValue(vector->childData[propertyId].get(),
*StructType::getFieldTypes(&type)[propertyId], val);
propertyId++;
Expand Down Expand Up @@ -455,9 +518,15 @@ void ArrowRowBatch::copyNonNullValue(ArrowVector* vector, const LogicalType& typ
case LogicalTypeID::ARRAY: {
templateCopyNonNullValue<LogicalTypeID::ARRAY>(vector, type, value, pos);
} break;
case LogicalTypeID::MAP: {
templateCopyNonNullValue<LogicalTypeID::MAP>(vector, type, value, pos);
} break;
case LogicalTypeID::STRUCT: {
templateCopyNonNullValue<LogicalTypeID::STRUCT>(vector, type, value, pos);
} break;
case LogicalTypeID::UNION: {
templateCopyNonNullValue<LogicalTypeID::UNION>(vector, type, value, pos);
} break;
case LogicalTypeID::INTERNAL_ID: {
templateCopyNonNullValue<LogicalTypeID::INTERNAL_ID>(vector, type, value, pos);
} break;
Expand Down Expand Up @@ -505,13 +574,28 @@ void ArrowRowBatch::templateCopyNullValue<LogicalTypeID::ARRAY>(ArrowVector* vec
vector->numNulls++;
}

template<>
void ArrowRowBatch::templateCopyNullValue<LogicalTypeID::MAP>(ArrowVector* vector,
std::int64_t pos) {
return templateCopyNullValue<LogicalTypeID::LIST>(vector, pos);
}

template<>
void ArrowRowBatch::templateCopyNullValue<LogicalTypeID::STRUCT>(ArrowVector* vector,
std::int64_t pos) {
setBitToZero(vector->validity.data(), pos);
vector->numNulls++;
}

void ArrowRowBatch::copyNullValueUnion(ArrowVector* vector, Value* value, std::int64_t pos) {
auto typeBuffer = (std::uint8_t*)vector->data.data();
auto offsetsBuffer = (std::int32_t*)vector->overflow.data();
typeBuffer[pos] = 0;
offsetsBuffer[pos] = vector->childData[0]->numValues;
copyNullValue(vector->childData[0].get(), value->children[0].get(), pos);
vector->numNulls++;
}

void ArrowRowBatch::copyNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
switch (value->dataType->getLogicalTypeID()) {
case LogicalTypeID::BOOL: {
Expand Down Expand Up @@ -581,12 +665,18 @@ void ArrowRowBatch::copyNullValue(ArrowVector* vector, Value* value, std::int64_
case LogicalTypeID::ARRAY: {
templateCopyNullValue<LogicalTypeID::ARRAY>(vector, pos);
} break;
case LogicalTypeID::MAP: {
templateCopyNullValue<LogicalTypeID::MAP>(vector, pos);
} break;
case LogicalTypeID::INTERNAL_ID: {
templateCopyNullValue<LogicalTypeID::INTERNAL_ID>(vector, pos);
} break;
case LogicalTypeID::STRUCT: {
templateCopyNullValue<LogicalTypeID::STRUCT>(vector, pos);
} break;
case LogicalTypeID::UNION: {
copyNullValueUnion(vector, value, pos);
} break;
case LogicalTypeID::NODE: {
templateCopyNullValue<LogicalTypeID::NODE>(vector, pos);
} break;
Expand Down Expand Up @@ -671,6 +761,12 @@ ArrowArray* ArrowRowBatch::templateCreateArray<LogicalTypeID::ARRAY>(ArrowVector
return vector.array.get();
}

template<>
ArrowArray* ArrowRowBatch::templateCreateArray<LogicalTypeID::MAP>(ArrowVector& vector,
const LogicalType& type) {
return templateCreateArray<LogicalTypeID::LIST>(vector, type);
}

template<>
ArrowArray* ArrowRowBatch::templateCreateArray<LogicalTypeID::STRUCT>(ArrowVector& vector,
const LogicalType& type) {
Expand Down Expand Up @@ -707,6 +803,33 @@ ArrowArray* ArrowRowBatch::convertInternalIDVectorToArray(ArrowVector& vector,
return vector.array.get();
}

template<>
ArrowArray* ArrowRowBatch::templateCreateArray<LogicalTypeID::UNION>(ArrowVector& vector,
const LogicalType& type) {
// since union is a special case, we make the ArrowArray ourselves instead of using
// createArrayFromVector
auto nChildren = UnionType::getNumFields(&type);
vector.array = std::make_unique<ArrowArray>();
vector.array->private_data = nullptr;
vector.array->release = releaseArrowVector;
vector.array->n_children = nChildren;
vector.childPointers.resize(nChildren);
vector.array->children = vector.childPointers.data();
vector.array->offset = 0;
vector.array->dictionary = nullptr;
vector.array->buffers = vector.buffers.data();
vector.array->null_count = vector.numNulls;
vector.array->length = vector.numValues;
vector.array->n_buffers = 2;
vector.array->buffers[0] = vector.data.data();
vector.array->buffers[1] = vector.overflow.data();
for (auto i = 0u; i < nChildren; i++) {
auto childType = UnionType::getFieldType(&type, i);
vector.childPointers[i] = convertVectorToArray(*vector.childData[i], *childType);
}
return vector.array.get();
}

template<>
ArrowArray* ArrowRowBatch::templateCreateArray<LogicalTypeID::INTERNAL_ID>(ArrowVector& vector,
const LogicalType& type) {
Expand Down Expand Up @@ -794,9 +917,15 @@ ArrowArray* ArrowRowBatch::convertVectorToArray(ArrowVector& vector, const Logic
case LogicalTypeID::ARRAY: {
return templateCreateArray<LogicalTypeID::ARRAY>(vector, type);
}
case LogicalTypeID::MAP: {
return templateCreateArray<LogicalTypeID::MAP>(vector, type);
}
case LogicalTypeID::STRUCT: {
return templateCreateArray<LogicalTypeID::STRUCT>(vector, type);
}
case LogicalTypeID::UNION: {
return templateCreateArray<LogicalTypeID::UNION>(vector, type);
}
case LogicalTypeID::INTERNAL_ID: {
return templateCreateArray<LogicalTypeID::INTERNAL_ID>(vector, type);
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/types/value/rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ Value* RelVal::getPropertyVal(const Value* val, uint64_t index) {
return val->children[index + OFFSET].get();
}

Value* RelVal::getIDVal(const Value* val) {
auto fieldIdx = StructType::getFieldIdx(val->dataType.get(), InternalKeyword::ID);
return val->children[fieldIdx].get();
}

Value* RelVal::getSrcNodeIDVal(const Value* val) {
auto fieldIdx = StructType::getFieldIdx(val->dataType.get(), InternalKeyword::SRC);
return val->children[fieldIdx].get();
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/arrow/arrow_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ struct ArrowConverter {
static void initializeChild(ArrowSchema& child, const std::string& name = "");
static void setArrowFormatForStruct(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
const LogicalType& dataType);
static void setArrowFormatForUnion(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
const LogicalType& dataType);
static void setArrowFormatForInternalID(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
const LogicalType& dataType);
static void setArrowFormat(ArrowSchemaHolder& rootHolder, ArrowSchema& child,
Expand Down
1 change: 1 addition & 0 deletions src/include/common/arrow/arrow_row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ArrowRowBatch {
std::int64_t pos);
template<LogicalTypeID DT>
static void templateCopyNullValue(ArrowVector* vector, std::int64_t pos);
static void copyNullValueUnion(ArrowVector* vector, Value* value, std::int64_t pos);
template<LogicalTypeID DT>
static ArrowArray* templateCreateArray(ArrowVector& vector, const LogicalType& type);

Expand Down
4 changes: 4 additions & 0 deletions src/include/common/types/value/rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class RelVal {
* @return the dst nodeID value of the RelVal in Value.
*/
KUZU_API static Value* getDstNodeIDVal(const Value* val);
/**
* @return the internal ID value of the RelVal in Value.
*/
KUZU_API static Value* getIDVal(const Value* val);
/**
* @return the label value of the RelVal.
*/
Expand Down
Loading
Loading