Skip to content

Commit

Permalink
Fix parquet-loader
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 10, 2023
1 parent 7dd607b commit eb52261
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 101 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.6.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.6.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
auto listEntry = list_entry_t{size, listSize};
bool needResizeDataVector = size + listSize > capacity;
while (size + listSize > capacity) {
capacity *= 2;
capacity *= VAR_LIST_RESIZE_RATIO;
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ constexpr uint64_t THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS = 500;

constexpr uint64_t DEFAULT_CHECKPOINT_WAIT_TIMEOUT_FOR_TRANSACTIONS_TO_LEAVE_IN_MICROS = 5000000;

constexpr uint64_t VAR_LIST_RESIZE_RATIO = 2;

struct InternalKeyword {
static constexpr char ANONYMOUS[] = "";
static constexpr char ID[] = "_ID";
Expand Down
11 changes: 5 additions & 6 deletions src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class StringColumnChunk : public ColumnChunk {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

template<typename T>
void setValueFromString(const char* value, uint64_t length, uint64_t pos) {
Expand All @@ -26,13 +25,13 @@ class StringColumnChunk : public ColumnChunk {
throw common::NotImplementedException("VarSizedColumnChunk::getValue");
}

protected:
inline common::page_idx_t getNumPagesForBuffer() const final {
auto numPagesForOffsets = ColumnChunk::getNumPagesForBuffer();
return numPagesForOffsets + overflowFile->getNumPages();
}
common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

private:
inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + overflowFile->getNumPages();
}

template<typename T>
void templateCopyVarSizedValuesFromString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand Down
35 changes: 31 additions & 4 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "arrow/array/array_nested.h"
#include "storage/copier/column_chunk.h"

using namespace kuzu::common;
Expand All @@ -12,11 +13,13 @@ struct VarListDataColumnChunk {
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;

VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
explicit VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataChunk{std::move(dataChunk)}, numValuesInDataChunk{0},
capacityInDataChunk{StorageConstants::NODE_GROUP_SIZE} {}

void reset();

void resize(uint64_t numValues);
};

class VarListColumnChunk : public ColumnChunk {
Expand All @@ -38,18 +41,42 @@ class VarListColumnChunk : public ColumnChunk {

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

template<typename T>
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
nullChunk->setNull(i + startPosInChunk, listArray->IsNull(i));
auto length = listArray->value_length(i);
varListDataColumnChunk.numValuesInDataChunk += length;
setValue(varListDataColumnChunk.numValuesInDataChunk, i + startPosInChunk);
}
auto startOffset = listArray->value_offset(startPosInChunk);
auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(
listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset);
}

void write(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
VarListDataColumnChunk varListDataColumnChunk;

inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

inline offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}
};

} // namespace storage
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9},
{"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4},
{"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10},
{"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5},
{"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
10 changes: 8 additions & 2 deletions src/include/storage/store/string_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class StringNodeColumn : public NodeColumn {
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

common::page_idx_t append(ColumnChunk* columnChunk, common::page_idx_t startPageIdx,
common::node_group_idx_t nodeGroupIdx) final;

void checkpointInMemory() final;
void rollbackInMemory() final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand All @@ -27,10 +33,10 @@ class StringNodeColumn : public NodeColumn {

private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t chunkStartPageIdx);
common::ValueVector* resultVector, common::node_group_idx_t nodeGroupIdx);

private:
common::page_idx_t ovfPageIdxInChunk;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> overflowMetadataDA;
};

} // namespace storage
Expand Down
14 changes: 6 additions & 8 deletions src/storage/copier/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void StringColumnChunk::resetToEmpty() {

void StringColumnChunk::append(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST);
assert(array->type_id() == arrow::Type::STRING);
switch (array->type_id()) {
case arrow::Type::STRING: {
switch (dataType.getLogicalTypeID()) {
Expand All @@ -76,7 +76,7 @@ void StringColumnChunk::append(

void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherChunk = dynamic_cast<StringColumnChunk*>(other);
auto otherChunk = reinterpret_cast<StringColumnChunk*>(other);
nullChunk->append(
otherChunk->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
switch (dataType.getLogicalTypeID()) {
Expand All @@ -91,21 +91,19 @@ void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk
}
}

page_idx_t StringColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
ColumnChunk::flushBuffer(dataFH, startPageIdx);
startPageIdx += ColumnChunk::getNumPagesForBuffer();
page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
for (auto i = 0u; i < overflowFile->getNumPages(); i++) {
FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data,
BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
startPageIdx++;
}
return getNumPagesForBuffer();
return overflowFile->getNumPages();
}

void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other,
offset_t startPosInOtherChunk, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherKuVals = (ku_string_t*)(other->buffer.get());
auto kuVals = (ku_string_t*)(buffer.get());
auto otherKuVals = reinterpret_cast<ku_string_t*>(other->buffer.get());
auto kuVals = reinterpret_cast<ku_string_t*>(buffer.get());
for (auto i = 0u; i < numValuesToAppend; i++) {
auto posInChunk = i + startPosInChunk;
auto posInOtherChunk = i + startPosInOtherChunk;
Expand Down
67 changes: 35 additions & 32 deletions src/storage/copier/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ void VarListDataColumnChunk::reset() {
numValuesInDataChunk = 0;
}

void VarListDataColumnChunk::resize(uint64_t numValues) {
if (numValues <= capacityInDataChunk) {
return;
}
while (capacityInDataChunk < numValues) {
capacityInDataChunk *= VAR_LIST_RESIZE_RATIO;
}
dataChunk->resize(capacityInDataChunk);
}

VarListColumnChunk::VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription)
: ColumnChunk{std::move(dataType), copyDescription, true /* hasNullChunk */},
varListDataColumnChunk{ColumnChunkFactory::createColumnChunk(
Expand All @@ -20,20 +30,42 @@ VarListColumnChunk::VarListColumnChunk(LogicalType dataType, CopyDescription* co

void VarListColumnChunk::append(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST);
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST ||
array->type_id() == arrow::Type::LARGE_LIST);
switch (array->type_id()) {
case arrow::Type::STRING: {
copyVarListFromArrowString(array, startPosInChunk, numValuesToAppend);
} break;
case arrow::Type::LIST: {
copyVarListFromArrowList(array, startPosInChunk, numValuesToAppend);
copyVarListFromArrowList<arrow::ListArray>(array, startPosInChunk, numValuesToAppend);
} break;
case arrow::Type::LARGE_LIST: {
copyVarListFromArrowList<arrow::LargeListArray>(array, startPosInChunk, numValuesToAppend);

Check warning on line 43 in src/storage/copier/var_list_column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/var_list_column_chunk.cpp#L42-L43

Added lines #L42 - L43 were not covered by tests
} break;
default: {
throw NotImplementedException("ListColumnChunk::appendArray");
}
}
}

void VarListColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
offset_t startPosInChunk, uint32_t numValuesToAppend) {
nullChunk->append(
other->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
auto otherListChunk = reinterpret_cast<VarListColumnChunk*>(other);
auto offsetInDataChunkToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
varListDataColumnChunk.numValuesInDataChunk +=
otherListChunk->getListLen(startPosInOtherChunk + i);
setValue(varListDataColumnChunk.numValuesInDataChunk, startPosInChunk + i);
}
auto startOffset = otherListChunk->getListOffset(startPosInOtherChunk);
auto endOffset = otherListChunk->getListOffset(startPosInOtherChunk + numValuesToAppend);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(otherListChunk->varListDataColumnChunk.dataChunk.get(),
startOffset, offsetInDataChunkToAppend, endOffset - startOffset);
}

void VarListColumnChunk::copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto stringArray = (arrow::StringArray*)array;
Expand Down Expand Up @@ -62,29 +94,10 @@ void VarListColumnChunk::copyVarListFromArrowString(
}
}

void VarListColumnChunk::copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
// auto listArray = (arrow::ListArray*)array;
// auto offsetArray = listArray->offsets()->Slice(1 /* offset */);
// append(offsetArray.get(), startPosInChunk, numValuesToAppend);
// if (offsetArray->data()->MayHaveNulls()) {
// for (auto i = 0u; i < numValuesToAppend; i++) {
// nullChunk->setNull(i + startPosInChunk, offsetArray->data()->IsNull(i));
// }
// } else {
// nullChunk->setRangeNoNull(startPosInChunk, numValuesToAppend);
// }
// auto startOffset = listArray->value_offset(startPosInChunk);
// auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
// varListDataColumnChunk.dataChunk->resize(endOffset - startOffset);
// varListDataColumnChunk.dataChunk->append(
// listArray->offsets().get(), startOffset, endOffset - startOffset);
}

void VarListColumnChunk::write(const Value& listVal, uint64_t posToWrite) {
assert(listVal.getDataType()->getPhysicalType() == PhysicalTypeID::VAR_LIST);
auto numValuesInList = NestedVal::getChildrenSize(&listVal);
resizeDataChunk(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
for (auto i = 0u; i < numValuesInList; i++) {
varListDataColumnChunk.dataChunk->write(
*NestedVal::getChildVal(&listVal, i), varListDataColumnChunk.numValuesInDataChunk);
Expand All @@ -104,15 +117,5 @@ void VarListColumnChunk::resetToEmpty() {
varListDataColumnChunk.reset();
}

void VarListColumnChunk::resizeDataChunk(uint64_t numValues) {
if (numValues <= varListDataColumnChunk.capacityInDataChunk) {
return;
}
while (varListDataColumnChunk.capacityInDataChunk < numValues) {
varListDataColumnChunk.capacityInDataChunk *= 2;
}
varListDataColumnChunk.dataChunk->resize(varListDataColumnChunk.capacityInDataChunk);
}

} // namespace storage
} // namespace kuzu
4 changes: 4 additions & 0 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ std::unique_ptr<MetadataDAHInfo> StorageManager::createMetadataDAHInfo(
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType)));
} break;
case PhysicalTypeID::STRING: {
auto dummyChildType = LogicalType{LogicalTypeID::ANY};
metadataDAHInfo->childrenInfos.push_back(createMetadataDAHInfo(dummyChildType));
} break;
default: {
// DO NOTHING.
}
Expand Down
Loading

0 comments on commit eb52261

Please sign in to comment.