Skip to content

Commit

Permalink
Merge pull request #1193 from kuzudb/arrow
Browse files Browse the repository at this point in the history
Export query result (fixed sized values only) to arrow
  • Loading branch information
ray6080 committed Jan 21, 2023
2 parents cf2e3b3 + ecb9e3f commit 698ac85
Show file tree
Hide file tree
Showing 38 changed files with 734 additions and 148 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ pytest: arrow
cd $(ROOT_DIR)/tools/python_api/test && \
python3 -m pytest -v test_main.py

clean-python-api:
rm -rf tools/python_api/build

clean-external:
rm -rf external/build

clean:
clean: clean-python-api
rm -rf build

clean-all: clean-external clean
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory(arrow)
add_subdirectory(csv_reader)
add_subdirectory(data_chunk)
add_subdirectory(task_system)
Expand Down
8 changes: 8 additions & 0 deletions src/common/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(kuzu_common_arrow
OBJECT
arrow_row_batch.cpp
arrow_converter.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_common_arrow>
PARENT_SCOPE)
100 changes: 100 additions & 0 deletions src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "common/arrow/arrow_converter.h"

#include "common/arrow/arrow_row_batch.h"

namespace kuzu {
namespace common {

struct ArrowSchemaHolder {
vector<ArrowSchema> children;
vector<ArrowSchema*> childrenPtrs;
vector<unique_ptr<char[]>> ownedTypeNames;
};

static void releaseArrowSchema(ArrowSchema* schema) {
if (!schema || schema->release) {
return;
}
schema->release = nullptr;
auto holder = static_cast<ArrowSchemaHolder*>(schema->private_data);
delete holder;
}

static void initializeChild(ArrowSchema& child, const string& name = "") {
//! Child is cleaned up by parent
child.private_data = nullptr;
child.release = releaseArrowSchema;

//! Store the child schema
child.flags = ARROW_FLAG_NULLABLE;
child.name = name.c_str();
child.n_children = 0;
child.children = nullptr;
child.metadata = nullptr;
child.dictionary = nullptr;
}

static void setArrowFormat(ArrowSchema& child, const DataType& type) {
switch (type.typeID) {
case DataTypeID::BOOL: {
child.format = "b";
} break;
case DataTypeID::INT64: {
child.format = "l";
} break;
case DataTypeID::DOUBLE: {
child.format = "g";
} break;
case DataTypeID::DATE: {
child.format = "tdD";
} break;
case DataTypeID::TIMESTAMP: {
child.format = "tsu:";
} break;
case DataTypeID::INTERVAL: {
child.format = "tDm";
} break;
default:
throw InternalException("Unsupported Arrow type " + Types::dataTypeToString(type));
}
}

void ArrowConverter::toArrowSchema(
ArrowSchema* outSchema, std::vector<DataType>& types, std::vector<std::string>& names) {
assert(outSchema && types.size() == names.size());
auto rootHolder = make_unique<ArrowSchemaHolder>();

auto columnCount = (int64_t)names.size();
rootHolder->children.resize(columnCount);
rootHolder->childrenPtrs.resize(columnCount);
for (auto i = 0u; i < columnCount; i++) {
rootHolder->childrenPtrs[i] = &rootHolder->children[i];
}
outSchema->children = rootHolder->childrenPtrs.data();
outSchema->n_children = columnCount;

outSchema->format = "+s"; // struct apparently
outSchema->flags = 0;
outSchema->metadata = nullptr;
outSchema->name = "kuzu_query_result";
outSchema->dictionary = nullptr;

for (auto i = 0u; i < columnCount; i++) {
auto& child = rootHolder->children[i];
initializeChild(child, names[i]);
setArrowFormat(child, types[i]);
}

outSchema->private_data = rootHolder.release();
outSchema->release = releaseArrowSchema;
}

void ArrowConverter::toArrowArray(
main::QueryResult& queryResult, ArrowArray* outArray, std::int64_t chunkSize) {
auto types = queryResult.getColumnDataTypes();
auto rowBatch = make_unique<ArrowRowBatch>(types, chunkSize);
*outArray = rowBatch->append(queryResult, chunkSize);
}

} // namespace common
} // namespace kuzu
147 changes: 147 additions & 0 deletions src/common/arrow/arrow_row_batch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include "common/arrow/arrow_row_batch.h"

ArrowRowBatch::ArrowRowBatch(std::vector<DataType> types, std::int64_t capacity)
: types{std::move(types)} {
auto numVectors = this->types.size();
vectors.resize(numVectors);
for (auto i = 0u; i < numVectors; i++) {
vectors[i] = make_unique<ArrowVector>();
initializeArrowVector(vectors[i].get(), this->types[i], capacity);
}
}

void ArrowRowBatch::initializeArrowVector(
ArrowVector* vector, const DataType& type, std::int64_t capacity) {
auto numBytesForValidity = getNumBytesForBits(capacity);
vector->validity.resize(numBytesForValidity, 0xFF);
std::int64_t numBytesForData = 0;
if (type.typeID == BOOL) {
numBytesForData = getNumBytesForBits(capacity);
} else {
numBytesForData = Types::getDataTypeSize(type) * capacity;
}
vector->data.reserve(numBytesForData);
}

static void getBitPosition(std::int64_t pos, std::int64_t& bytePos, std::int64_t& bitOffset) {
bytePos = pos / 8;
bitOffset = pos % 8;
}

static void setBitToZero(std::uint8_t* data, std::int64_t pos) {
std::int64_t bytePos, bitOffset;
getBitPosition(pos, bytePos, bitOffset);
data[bytePos] &= ~((std::uint64_t)1 << bitOffset);
}

static void setBitToOne(std::uint8_t* data, std::int64_t pos) {
std::int64_t bytePos, bitOffset;
getBitPosition(pos, bytePos, bitOffset);
data[bytePos] |= ((std::uint64_t)1 << bitOffset);
}

void ArrowRowBatch::setValue(ArrowVector* vector, Value* value, std::int64_t pos) {
if (value->isNull_) {
setBitToZero(vector->validity.data(), pos);
vector->numNulls++;
return;
}
switch (value->dataType.typeID) {
case BOOL: {
if (value->val.booleanVal) {
setBitToOne(vector->data.data(), pos);
} else {
setBitToZero(vector->data.data(), pos);
}
} break;
case INT64: {
((std::int64_t*)vector->data.data())[pos] = value->val.int64Val;
} break;
case DOUBLE: {
((std::double_t*)vector->data.data())[pos] = value->val.doubleVal;
} break;
case DATE: {
((date_t*)vector->data.data())[pos] = value->val.dateVal;
} break;
case TIMESTAMP: {
((timestamp_t*)vector->data.data())[pos] = value->val.timestampVal;
} break;
case INTERVAL: {
((interval_t*)vector->data.data())[pos] = value->val.intervalVal;
} break;
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(value->dataType) +
" is not supported for arrow exportation.");
}
}
}

static void releaseArrowVector(ArrowArray* array) {
if (!array || !array->release) {
return;
}
array->release = nullptr;
auto holder = static_cast<ArrowVector*>(array->private_data);
delete holder;
}

ArrowArray* ArrowRowBatch::finalizeArrowChild(const DataType& type, ArrowVector& vector) {
auto result = make_unique<ArrowArray>();

result->private_data = nullptr;
result->release = releaseArrowVector;
result->n_children = 0;
result->offset = 0;
result->dictionary = nullptr;
result->buffers = vector.buffers.data();
result->null_count = vector.numNulls;
result->length = vector.numValues;
result->buffers[0] = vector.validity.data();
result->buffers[1] = vector.data.data();
result->n_buffers = 2;

vector.array = std::move(result);
return vector.array.get();
}

ArrowArray ArrowRowBatch::finalize() {
auto rootHolder = make_unique<ArrowVector>();
ArrowArray result;
rootHolder->childPointers.resize(types.size());
result.children = rootHolder->childPointers.data();
result.n_children = (std::int64_t)types.size();
result.length = numRows;
result.n_buffers = 1;
result.buffers = rootHolder->buffers.data(); // no actual buffer
result.offset = 0;
result.null_count = 0;
result.dictionary = nullptr;
rootHolder->childData = std::move(vectors);
for (auto i = 0u; i < rootHolder->childData.size(); i++) {
rootHolder->childPointers[i] = finalizeArrowChild(types[i], *rootHolder->childData[i]);
}

result.private_data = rootHolder.release();
result.release = releaseArrowVector;
return result;
}

ArrowArray ArrowRowBatch::append(main::QueryResult& queryResult, std::int64_t chunkSize) {
std::int64_t numTuples = 0;
auto numColumns = queryResult.getColumnNames().size();
while (numTuples < chunkSize) {
if (!queryResult.hasNext()) {
break;
}
auto tuple = queryResult.getNext();
for (auto i = 0u; i < numColumns; i++) {
setValue(vectors[i].get(), tuple->getValue(i), numTuples);
}
numTuples++;
}
for (auto i = 0u; i < numColumns; i++) {
vectors[i]->numValues = numTuples;
}
numRows = numTuples;
return finalize();
}
6 changes: 0 additions & 6 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,5 @@ vector<string> StringUtils::split(const string& input, const string& delimiter)
return result;
}

string ThreadUtils::getThreadIDString() {
std::ostringstream oss;
oss << std::this_thread::get_id();
return oss.str();
}

} // namespace common
} // namespace kuzu
56 changes: 56 additions & 0 deletions src/include/common/arrow/arrow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

// The Arrow C data interface.
// https://arrow.apache.org/docs/format/CDataInterface.html

#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif

#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE

#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4

struct ArrowSchema {
// Array type description
const char* format;
const char* name;
const char* metadata;
int64_t flags;
int64_t n_children;
struct ArrowSchema** children;
struct ArrowSchema* dictionary;

// Release callback
void (*release)(struct ArrowSchema*);
// Opaque producer-specific data
void* private_data;
};

struct ArrowArray {
// Array data description
int64_t length;
int64_t null_count;
int64_t offset;
int64_t n_buffers;
int64_t n_children;
const void** buffers;
struct ArrowArray** children;
struct ArrowArray* dictionary;

// Release callback
void (*release)(struct ArrowArray*);
// Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_DATA_INTERFACE

#ifdef __cplusplus
}
#endif
Loading

0 comments on commit 698ac85

Please sign in to comment.