Skip to content

Commit

Permalink
add export to arrow for string
Browse files Browse the repository at this point in the history
  • Loading branch information
guodong authored and guodong committed Jan 23, 2023
1 parent 9253d89 commit 9cc5578
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 150 deletions.
3 changes: 3 additions & 0 deletions src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ static void setArrowFormat(ArrowSchema& child, const DataType& type) {
case DataTypeID::INTERVAL: {
child.format = "tDm";
} break;
case DataTypeID::STRING: {
child.format = "u";
} break;
default:
throw InternalException("Unsupported Arrow type " + Types::dataTypeToString(type));
}
Expand Down
244 changes: 199 additions & 45 deletions src/common/arrow/arrow_row_batch.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,72 @@
#include "common/arrow/arrow_row_batch.h"

ArrowRowBatch::ArrowRowBatch(std::vector<DataType> types, std::int64_t capacity)
: types{std::move(types)} {
: types{std::move(types)}, numTuples{0} {
auto numVectors = this->types.size();
vectors.resize(numVectors);
for (auto i = 0u; i < numVectors; i++) {
vectors[i] = make_unique<ArrowVector>();
initializeArrowVector(vectors[i].get(), this->types[i], capacity);
vectors[i] = createVector(this->types[i], capacity);
}
}

void ArrowRowBatch::initializeArrowVector(
ArrowVector* vector, const DataType& type, std::int64_t capacity) {
auto numBytesForValidity = getNumBytesForBits(capacity);
vector->validity.resize(numBytesForValidity, 0xFF);
std::int64_t numBytesForData = 0;
if (type.typeID == BOOL) {
numBytesForData = getNumBytesForBits(capacity);
} else {
numBytesForData = Types::getDataTypeSize(type) * capacity;
template<typename T>
void ArrowRowBatch::templateInitializeVector(ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
vector->data.reserve(sizeof(T) * capacity);
}

template<>
void ArrowRowBatch::templateInitializeVector<bool>(ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
vector->data.reserve(getNumBytesForBits(capacity));
}

template<>
void ArrowRowBatch::templateInitializeVector<std::string>(
ArrowVector* vector, std::int64_t capacity) {
initializeNullBits(vector->validity, capacity);
// Initialize offsets and string values buffer.
vector->data.reserve((capacity + 1) * sizeof(std::uint32_t));
((std::uint32_t*)vector->data.data())[0] = 0;
vector->overflow.reserve(capacity);
}

std::unique_ptr<ArrowVector> ArrowRowBatch::createVector(
const DataType& type, std::int64_t capacity) {
auto result = make_unique<ArrowVector>();
switch (type.typeID) {
case BOOL: {
templateInitializeVector<bool>(result.get(), capacity);
} break;
case INT64: {
templateInitializeVector<std::int64_t>(result.get(), capacity);
} break;
case DOUBLE: {
templateInitializeVector<std::double_t>(result.get(), capacity);
} break;
case DATE: {
templateInitializeVector<date_t>(result.get(), capacity);
} break;
case TIMESTAMP: {
templateInitializeVector<timestamp_t>(result.get(), capacity);
} break;
case INTERVAL: {
templateInitializeVector<interval_t>(result.get(), capacity);
} break;
case STRING: {
templateInitializeVector<std::string>(result.get(), capacity);
} break;
default: {
throw RuntimeException("Unsupported data type " + Types::dataTypeToString(type) +
" for arrow vector initialization.");
}
vector->data.reserve(numBytesForData);
}
return std::move(result);
}

static void getBitPosition(std::int64_t pos, std::int64_t& bytePos, std::int64_t& bitOffset) {
bytePos = pos / 8;
bitOffset = pos % 8;
bytePos = pos >> 3;
bitOffset = pos - (bytePos << 3);
}

static void setBitToZero(std::uint8_t* data, std::int64_t pos) {
Expand All @@ -40,34 +81,107 @@ static void setBitToOne(std::uint8_t* data, std::int64_t pos) {
data[bytePos] |= ((std::uint64_t)1 << bitOffset);
}

void ArrowRowBatch::setValue(ArrowVector* vector, Value* value, std::int64_t pos) {
void ArrowRowBatch::appendValue(ArrowVector* vector, Value* value) {
if (value->isNull_) {
setBitToZero(vector->validity.data(), pos);
vector->numNulls++;
return;
copyNullValue(vector, value, vector->numValues);
} else {
copyNonNullValue(vector, value, vector->numValues);
}
vector->numValues++;
}

template<typename T>
void ArrowRowBatch::templateCopyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
std::memcpy(vector->data.data() + pos * sizeof(T), &value->val, sizeof(T));
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<bool>(
ArrowVector* vector, Value* value, std::int64_t pos) {
if (value->val.booleanVal) {
setBitToOne(vector->data.data(), pos);
} else {
setBitToZero(vector->data.data(), pos);
}
}

template<>
void ArrowRowBatch::templateCopyNonNullValue<std::string>(
ArrowVector* vector, Value* value, std::int64_t pos) {
auto offsets = (std::uint32_t*)vector->data.data();
auto strLength = value->strVal.length();
offsets[pos + 1] = offsets[pos] + strLength;
vector->overflow.resize(offsets[pos + 1]);
std::memcpy(vector->overflow.data() + offsets[pos], value->strVal.data(), strLength);
}

void ArrowRowBatch::copyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
switch (value->dataType.typeID) {
case BOOL: {
if (value->val.booleanVal) {
setBitToOne(vector->data.data(), pos);
} else {
setBitToZero(vector->data.data(), pos);
}
templateCopyNonNullValue<bool>(vector, value, pos);
} break;
case INT64: {
templateCopyNonNullValue<std::int64_t>(vector, value, pos);
} break;
case DOUBLE: {
templateCopyNonNullValue<std::double_t>(vector, value, pos);
} break;
case DATE: {
templateCopyNonNullValue<date_t>(vector, value, pos);
} break;
case TIMESTAMP: {
templateCopyNonNullValue<timestamp_t>(vector, value, pos);
} break;
case INTERVAL: {
templateCopyNonNullValue<interval_t>(vector, value, pos);
} break;
case STRING: {
templateCopyNonNullValue<std::string>(vector, value, pos);
} break;
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(value->dataType) +
" is not supported for arrow exportation.");
}
}
}

template<typename T>
void ArrowRowBatch::templateCopyNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
setBitToZero(vector->validity.data(), vector->numValues);
vector->numNulls++;
}

template<>
void ArrowRowBatch::templateCopyNullValue<std::string>(
ArrowVector* vector, Value* value, std::int64_t pos) {
auto offsets = (std::uint32_t*)vector->data.data();
offsets[pos + 1] = offsets[pos];
setBitToZero(vector->validity.data(), vector->numValues);
vector->numNulls++;
}

void ArrowRowBatch::copyNullValue(ArrowVector* vector, Value* value, std::int64_t pos) {
switch (value->dataType.typeID) {
case BOOL: {
templateCopyNullValue<bool>(vector, value, pos);
} break;
case INT64: {
((std::int64_t*)vector->data.data())[pos] = value->val.int64Val;
templateCopyNullValue<std::int64_t>(vector, value, pos);
} break;
case DOUBLE: {
((std::double_t*)vector->data.data())[pos] = value->val.doubleVal;
templateCopyNullValue<std::double_t>(vector, value, pos);
} break;
case DATE: {
((date_t*)vector->data.data())[pos] = value->val.dateVal;
templateCopyNullValue<date_t>(vector, value, pos);
} break;
case TIMESTAMP: {
((timestamp_t*)vector->data.data())[pos] = value->val.timestampVal;
templateCopyNullValue<timestamp_t>(vector, value, pos);
} break;
case INTERVAL: {
((interval_t*)vector->data.data())[pos] = value->val.intervalVal;
templateCopyNullValue<interval_t>(vector, value, pos);
} break;
case STRING: {
templateCopyNullValue<std::string>(vector, value, pos);
} break;
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(value->dataType) +
Expand All @@ -85,9 +199,8 @@ static void releaseArrowVector(ArrowArray* array) {
delete holder;
}

ArrowArray* ArrowRowBatch::finalizeArrowChild(const DataType& type, ArrowVector& vector) {
static unique_ptr<ArrowArray> createArrayFromVector(ArrowVector& vector) {
auto result = make_unique<ArrowArray>();

result->private_data = nullptr;
result->release = releaseArrowVector;
result->n_children = 0;
Expand All @@ -96,52 +209,93 @@ ArrowArray* ArrowRowBatch::finalizeArrowChild(const DataType& type, ArrowVector&
result->buffers = vector.buffers.data();
result->null_count = vector.numNulls;
result->length = vector.numValues;
result->n_buffers = 2;
result->buffers[0] = vector.validity.data();
result->buffers[1] = vector.data.data();
result->n_buffers = 2;
return std::move(result);
}

template<typename T>
ArrowArray* ArrowRowBatch::templateCreateArray(ArrowVector& vector) {
auto result = createArrayFromVector(vector);
vector.array = std::move(result);
return vector.array.get();
}

ArrowArray ArrowRowBatch::finalize() {
template<>
ArrowArray* ArrowRowBatch::templateCreateArray<string>(ArrowVector& vector) {
auto result = createArrayFromVector(vector);
result->n_buffers = 3;
result->buffers[2] = vector.overflow.data();
vector.array = std::move(result);
return vector.array.get();
}

ArrowArray* ArrowRowBatch::convertVectorToArray(const DataType& type, ArrowVector& vector) {
switch (type.typeID) {
case BOOL: {
return templateCreateArray<bool>(vector);
}
case INT64: {
return templateCreateArray<std::int64_t>(vector);
}
case DOUBLE: {
return templateCreateArray<std::double_t>(vector);
}
case DATE: {
return templateCreateArray<date_t>(vector);
}
case TIMESTAMP: {
return templateCreateArray<timestamp_t>(vector);
}
case INTERVAL: {
return templateCreateArray<interval_t>(vector);
}
case STRING: {
return templateCreateArray<string>(vector);
}
default: {
throw RuntimeException("Data type " + Types::dataTypeToString(type) +
" is not supported for arrow exportation.");
}
}
}

ArrowArray ArrowRowBatch::toArray() {
auto rootHolder = make_unique<ArrowVector>();
ArrowArray result;
rootHolder->childPointers.resize(types.size());
result.children = rootHolder->childPointers.data();
result.n_children = (std::int64_t)types.size();
result.length = numRows;
result.length = numTuples;
result.n_buffers = 1;
result.buffers = rootHolder->buffers.data(); // no actual buffer
result.offset = 0;
result.null_count = 0;
result.dictionary = nullptr;
rootHolder->childData = std::move(vectors);
for (auto i = 0u; i < rootHolder->childData.size(); i++) {
rootHolder->childPointers[i] = finalizeArrowChild(types[i], *rootHolder->childData[i]);
rootHolder->childPointers[i] = convertVectorToArray(types[i], *rootHolder->childData[i]);
}

result.private_data = rootHolder.release();
result.release = releaseArrowVector;
return result;
}

ArrowArray ArrowRowBatch::append(main::QueryResult& queryResult, std::int64_t chunkSize) {
std::int64_t numTuples = 0;
std::int64_t numTuplesInBatch = 0;
auto numColumns = queryResult.getColumnNames().size();
while (numTuples < chunkSize) {
while (numTuplesInBatch < chunkSize) {
if (!queryResult.hasNext()) {
break;
}
auto tuple = queryResult.getNext();
vector<std::uint32_t> colWidths(numColumns, 10);
for (auto i = 0u; i < numColumns; i++) {
setValue(vectors[i].get(), tuple->getValue(i), numTuples);
appendValue(vectors[i].get(), tuple->getValue(i));
}
numTuples++;
}
for (auto i = 0u; i < numColumns; i++) {
vectors[i]->numValues = numTuples;
numTuplesInBatch++;
}
numRows = numTuples;
return finalize();
numTuples += numTuplesInBatch;
return toArray();
}
32 changes: 24 additions & 8 deletions src/include/common/arrow/arrow_row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace common {

// An Arrow Vector(i.e., Array) is defined by a few pieces of metadata and data:
// 1) a logical data type;
// 2) a sequence of buffers: validity bitmaps, data buffer, offsets(optional), children(optional).
// 2) a sequence of buffers: validity bitmaps, data buffer, overflow(optional), children(optional).
// 3) a length as a 64-bit signed integer;
// 4) a null count as a 64-bit signed integer;
// 5) an optional dictionary for dictionary-encoded arrays.
Expand All @@ -28,6 +28,7 @@ static inline std::int64_t getNumBytesForBits(std::int64_t numBits) {
struct ArrowVector {
ArrowBuffer data;
ArrowBuffer validity;
ArrowBuffer overflow;

std::int64_t numValues = 0;
std::int64_t numNulls = 0;
Expand All @@ -49,17 +50,32 @@ class ArrowRowBatch {
ArrowArray append(main::QueryResult& queryResult, std::int64_t chunkSize);

private:
static void initializeArrowVector(
ArrowVector* vector, const DataType& type, std::int64_t capacity);
static void setValue(ArrowVector* vector, Value* value, std::int64_t pos);
static ArrowArray* finalizeArrowChild(const DataType& type, ArrowVector& rowBatch);

ArrowArray finalize();
static std::unique_ptr<ArrowVector> createVector(const DataType& type, std::int64_t capacity);
static void appendValue(ArrowVector* vector, Value* value);

static ArrowArray* convertVectorToArray(const DataType& type, ArrowVector& vector);
static inline void initializeNullBits(ArrowBuffer& validity, std::int64_t capacity) {
auto numBytesForValidity = getNumBytesForBits(capacity);
validity.resize(numBytesForValidity, 0xFF);
}
static void copyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
static void copyNullValue(ArrowVector* vector, Value* value, std::int64_t pos);

template<typename T>
static void templateInitializeVector(ArrowVector* vector, std::int64_t capacity);
template<typename T>
static void templateCopyNonNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
template<typename T>
static void templateCopyNullValue(ArrowVector* vector, Value* value, std::int64_t pos);
template<typename T>
static ArrowArray* templateCreateArray(ArrowVector& vector);

ArrowArray toArray();

private:
std::vector<DataType> types;
std::vector<std::unique_ptr<ArrowVector>> vectors;
std::int64_t numRows = 0;
std::int64_t numTuples;
};

} // namespace common
Expand Down
Loading

0 comments on commit 9cc5578

Please sign in to comment.