Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export query result to arrow: string data type #1199

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ static void setArrowFormat(ArrowSchema& child, const DataType& type) {
case DataTypeID::INTERVAL: {
child.format = "tDm";
} break;
case DataTypeID::STRING: {
child.format = "u";
} break;
default:
throw InternalException("Unsupported Arrow type " + Types::dataTypeToString(type));
}
Expand Down
244 changes: 199 additions & 45 deletions src/common/arrow/arrow_row_batch.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,72 @@
#include "common/arrow/arrow_row_batch.h"

ArrowRowBatch::ArrowRowBatch(std::vector<DataType> types, std::int64_t capacity)
: types{std::move(types)} {
: types{std::move(types)}, numTuples{0} {
auto numVectors = this->types.size();
vectors.resize(numVectors);
for (auto i = 0u; i < numVectors; i++) {
vectors[i] = make_unique<ArrowVector>();
initializeArrowVector(vectors[i].get(), this->types[i], capacity);
vectors[i] = createVector(this->types[i], capacity);
}
}

void ArrowRowBatch::initializeArrowVector(
ArrowVector* vector, const DataType& type, std::int64_t capacity) {
auto numBytesForValidity = getNumBytesForBits(capacity);
vector->validity.resize(numBytesForValidity, 0xFF);
std::int64_t numBytesForData = 0;
if (type.typeID == BOOL) {
numBytesForData = getNumBytesForBits(capacity);
} else {
numBytesForData = Types::getDataTypeSize(type) * capacity;
template<typename T>
void ArrowRowBatch::templateInitializeVector(ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
vector->data.reserve(sizeof(T) * capacity);
}

template<>
void ArrowRowBatch::templateInitializeVector<bool>(ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
vector->data.reserve(getNumBytesForBits(capacity));
}

template<>
void ArrowRowBatch::templateInitializeVector<std::string>(
ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
// Initialize offsets and string values buffer.
vector->data.reserve((capacity + 1) * sizeof(std::uint32_t));
((std::uint32_t*)vector->data.data())[0] = 0;
vector->overflow.reserve(capacity);
}

std::unique_ptr<ArrowVector> ArrowRowBatch::createVector(
const DataType& type, std::int64_t capacity) {
auto result = make_unique<ArrowVector>();
switch (type.typeID) {
case BOOL: {
templateInitializeVector<bool>(result.get(), capacity);
} break;
case INT64: {
templateInitializeVector<std::int64_t>(result.get(), capacity);
} break;
case DOUBLE: {
templateInitializeVector<std::double_t>(result.get(), capacity);
} break;
case DATE: {
templateInitializeVector<date_t>(result.get(), capacity);
} break;
case TIMESTAMP: {
templateInitializeVector<timestamp_t>(result.get(), capacity);
} break;
case INTERVAL: {
templateInitializeVector<interval_t>(result.get(), capacity);
} break;
case STRING: {
templateInitializeVector<std::string>(result.get(), capacity);
} break;
default: {
throw RuntimeException("Unsupported data type " + Types::dataTypeToString(type) +
" for arrow vector initialization.");
}
vector->data.reserve(numBytesForData);
}
return std::move(result);
}

static void getBitPosition(std::int64_t pos, std::int64_t& bytePos, std::int64_t& bitOffset) {
bytePos = pos / 8;
bitOffset = pos % 8;
bytePos = pos >> 3;
bitOffset = pos - (bytePos << 3);
}

static void setBitToZero(std::uint8_t* data, std::int64_t pos) {
Expand All @@ -40,34 +81,107 @@ static void setBitToOne(std::uint8_t* data, std::int64_t pos) {
data[bytePos] |= ((std::uint64_t)1 << bitOffset);
}

void ArrowRowBatch::setValue(ArrowVector* vector, Value* value, std::int64_t pos) {
void ArrowRowBatch::appendValue(ArrowVector* vector, Value* value) {
if (value->isNull_) {
setBitToZero(vector->validity.data(), pos);
vector->numNulls++;
return;
copyNullValue(vector, value, vector->numValues);
} else {
copyNonNullValue(vector, value, vector->numValues);
}
vector->numValues++;
}

template<typename T>
void ArrowRowBatch::templateCopyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
std::memcpy(vector->data.data() + pos * sizeof(T), &value->val, sizeof(T));
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<bool>(
ArrowVector* vector, Value* value, std::int64_t pos) {
if (value->val.booleanVal) {
setBitToOne(vector->data.data(), pos);
} else {
setBitToZero(vector->data.data(), pos);
}
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<std::string>(
ArrowVector* vector, Value* value, std::int64_t pos) {
auto offsets = (std::uint32_t*)vector->data.data();
auto strLength = value->strVal.length();
offsets[pos + 1] = offsets[pos] + strLength;
vector->overflow.resize(offsets[pos + 1]);
std::memcpy(vector->overflow.data() + offsets[pos], value->strVal.data(), strLength);
}

void ArrowRowBatch::copyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
switch (value->dataType.typeID) {
case BOOL: {
if (value->val.booleanVal) {
setBitToOne(vector->data.data(), pos);
} else {
setBitToZero(vector->data.data(), pos);
}
templateCopyNonNullValue<bool>(vector, value, pos);
} break;
case INT64: {
templateCopyNonNullValue<std::int64_t>(vector, value, pos);
} break;
case DOUBLE: {
templateCopyNonNullValue<std::double_t>(vector, value, pos);
} break;
case DATE: {
templateCopyNonNullValue<date_t>(vector, value, pos);
} break;
case TIMESTAMP: {
templateCopyNonNullValue<timestamp_t>(vector, value, pos);
} break;
case INTERVAL: {
templateCopyNonNullValue<interval_t>(vector, value, pos);
} break;
case STRING: {
templateCopyNonNullValue<std::string>(vector, value, pos);
} break;
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(value->dataType) +
" is not supported for arrow exportation.");
}
}
}

template<typename T>
void ArrowRowBatch::templateCopyNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
setBitToZero(vector->validity.data(), vector->numValues);
vector->numNulls++;
}

template<>
void ArrowRowBatch::templateCopyNullValue<std::string>(
ArrowVector* vector, Value* value, std::int64_t pos) {
auto offsets = (std::uint32_t*)vector->data.data();
offsets[pos + 1] = offsets[pos];
setBitToZero(vector->validity.data(), vector->numValues);
vector->numNulls++;
}

void ArrowRowBatch::copyNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
switch (value->dataType.typeID) {
case BOOL: {
templateCopyNullValue<bool>(vector, value, pos);
} break;
case INT64: {
((std::int64_t*)vector->data.data())[pos] = value->val.int64Val;
templateCopyNullValue<std::int64_t>(vector, value, pos);
} break;
case DOUBLE: {
((std::double_t*)vector->data.data())[pos] = value->val.doubleVal;
templateCopyNullValue<std::double_t>(vector, value, pos);
} break;
case DATE: {
((date_t*)vector->data.data())[pos] = value->val.dateVal;
templateCopyNullValue<date_t>(vector, value, pos);
} break;
case TIMESTAMP: {
((timestamp_t*)vector->data.data())[pos] = value->val.timestampVal;
templateCopyNullValue<timestamp_t>(vector, value, pos);
} break;
case INTERVAL: {
((interval_t*)vector->data.data())[pos] = value->val.intervalVal;
templateCopyNullValue<interval_t>(vector, value, pos);
} break;
case STRING: {
templateCopyNullValue<std::string>(vector, value, pos);
} break;
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(value->dataType) +
Expand All @@ -85,9 +199,8 @@ static void releaseArrowVector(ArrowArray* array) {
delete holder;
}

ArrowArray* ArrowRowBatch::finalizeArrowChild(const DataType& type, ArrowVector& vector) {
static unique_ptr<ArrowArray> createArrayFromVector(ArrowVector& vector) {
auto result = make_unique<ArrowArray>();

result->private_data = nullptr;
result->release = releaseArrowVector;
result->n_children = 0;
Expand All @@ -96,52 +209,93 @@ ArrowArray* ArrowRowBatch::finalizeArrowChild(const DataType& type, ArrowVector&
result->buffers = vector.buffers.data();
result->null_count = vector.numNulls;
result->length = vector.numValues;
result->n_buffers = 2;
result->buffers[0] = vector.validity.data();
result->buffers[1] = vector.data.data();
result->n_buffers = 2;
return std::move(result);
}

template<typename T>
ArrowArray* ArrowRowBatch::templateCreateArray(ArrowVector& vector) {
auto result = createArrayFromVector(vector);
vector.array = std::move(result);
return vector.array.get();
}

ArrowArray ArrowRowBatch::finalize() {
template<>
ArrowArray* ArrowRowBatch::templateCreateArray<string>(ArrowVector& vector) {
auto result = createArrayFromVector(vector);
result->n_buffers = 3;
result->buffers[2] = vector.overflow.data();
vector.array = std::move(result);
return vector.array.get();
}

ArrowArray* ArrowRowBatch::convertVectorToArray(const DataType& type, ArrowVector& vector) {
switch (type.typeID) {
case BOOL: {
return templateCreateArray<bool>(vector);
}
case INT64: {
return templateCreateArray<std::int64_t>(vector);
}
case DOUBLE: {
return templateCreateArray<std::double_t>(vector);
}
case DATE: {
return templateCreateArray<date_t>(vector);
}
case TIMESTAMP: {
return templateCreateArray<timestamp_t>(vector);
}
case INTERVAL: {
return templateCreateArray<interval_t>(vector);
}
case STRING: {
return templateCreateArray<string>(vector);
}
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(type) +
" is not supported for arrow exportation.");
}
}
}

ArrowArray ArrowRowBatch::toArray() {
auto rootHolder = make_unique<ArrowVector>();
ArrowArray result;
rootHolder->childPointers.resize(types.size());
result.children = rootHolder->childPointers.data();
result.n_children = (std::int64_t)types.size();
result.length = numRows;
result.length = numTuples;
result.n_buffers = 1;
result.buffers = rootHolder->buffers.data(); // no actual buffer
result.offset = 0;
result.null_count = 0;
result.dictionary = nullptr;
rootHolder->childData = std::move(vectors);
for (auto i = 0u; i < rootHolder->childData.size(); i++) {
rootHolder->childPointers[i] = finalizeArrowChild(types[i], *rootHolder->childData[i]);
rootHolder->childPointers[i] = convertVectorToArray(types[i], *rootHolder->childData[i]);
}

result.private_data = rootHolder.release();
result.release = releaseArrowVector;
return result;
}

ArrowArray ArrowRowBatch::append(main::QueryResult& queryResult, std::int64_t chunkSize) {
std::int64_t numTuples = 0;
std::int64_t numTuplesInBatch = 0;
auto numColumns = queryResult.getColumnNames().size();
while (numTuples < chunkSize) {
while (numTuplesInBatch < chunkSize) {
if (!queryResult.hasNext()) {
break;
}
auto tuple = queryResult.getNext();
vector<std::uint32_t> colWidths(numColumns, 10);
for (auto i = 0u; i < numColumns; i++) {
setValue(vectors[i].get(), tuple->getValue(i), numTuples);
appendValue(vectors[i].get(), tuple->getValue(i));
}
numTuples++;
}
for (auto i = 0u; i < numColumns; i++) {
vectors[i]->numValues = numTuples;
numTuplesInBatch++;
}
numRows = numTuples;
return finalize();
numTuples += numTuplesInBatch;
return toArray();
}
32 changes: 24 additions & 8 deletions src/include/common/arrow/arrow_row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace common {

// An Arrow Vector(i.e., Array) is defined by a few pieces of metadata and data:
// 1) a logical data type;
// 2) a sequence of buffers: validity bitmaps, data buffer, offsets(optional), children(optional).
// 2) a sequence of buffers: validity bitmaps, data buffer, overflow(optional), children(optional).
// 3) a length as a 64-bit signed integer;
// 4) a null count as a 64-bit signed integer;
// 5) an optional dictionary for dictionary-encoded arrays.
Expand All @@ -28,6 +28,7 @@ static inline std::int64_t getNumBytesForBits(std::int64_t numBits) {
struct ArrowVector {
ArrowBuffer data;
ArrowBuffer validity;
ArrowBuffer overflow;

std::int64_t numValues = 0;
std::int64_t numNulls = 0;
Expand All @@ -49,17 +50,32 @@ class ArrowRowBatch {
ArrowArray append(main::QueryResult& queryResult, std::int64_t chunkSize);

private:
static void initializeArrowVector(
ArrowVector* vector, const DataType& type, std::int64_t capacity);
static void setValue(ArrowVector* vector, Value* value, std::int64_t pos);
static ArrowArray* finalizeArrowChild(const DataType& type, ArrowVector& rowBatch);

ArrowArray finalize();
static std::unique_ptr<ArrowVector> createVector(const DataType& type, std::int64_t capacity);
static void appendValue(ArrowVector* vector, Value* value);

static ArrowArray* convertVectorToArray(const DataType& type, ArrowVector& vector);
static inline void initializeNullBits(ArrowBuffer& validity, std::int64_t capacity) {
auto numBytesForValidity = getNumBytesForBits(capacity);
validity.resize(numBytesForValidity, 0xFF);
}
static void copyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
static void copyNullValue(ArrowVector* vector, Value* value, std::int64_t pos);

template<typename T>
static void templateInitializeVector(ArrowVector* vector, std::int64_t capacity);
template<typename T>
static void templateCopyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
template<typename T>
static void templateCopyNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
template<typename T>
static ArrowArray* templateCreateArray(ArrowVector& vector);

ArrowArray toArray();

private:
std::vector<DataType> types;
std::vector<std::unique_ptr<ArrowVector>> vectors;
std::int64_t numRows = 0;
std::int64_t numTuples;
};

} // namespace common
Expand Down
Loading