Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet loader fix #1914

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.6.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.6.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
auto listEntry = list_entry_t{size, listSize};
bool needResizeDataVector = size + listSize > capacity;
while (size + listSize > capacity) {
capacity *= 2;
capacity *= VAR_LIST_RESIZE_RATIO;
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ constexpr uint64_t THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS = 500;

constexpr uint64_t DEFAULT_CHECKPOINT_WAIT_TIMEOUT_FOR_TRANSACTIONS_TO_LEAVE_IN_MICROS = 5000000;

constexpr uint64_t VAR_LIST_RESIZE_RATIO = 2;

struct InternalKeyword {
static constexpr char ANONYMOUS[] = "";
static constexpr char ID[] = "_ID";
Expand Down
11 changes: 5 additions & 6 deletions src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class StringColumnChunk : public ColumnChunk {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

template<typename T>
void setValueFromString(const char* value, uint64_t length, uint64_t pos) {
Expand All @@ -26,13 +25,13 @@ class StringColumnChunk : public ColumnChunk {
throw common::NotImplementedException("VarSizedColumnChunk::getValue");
}

protected:
inline common::page_idx_t getNumPagesForBuffer() const final {
auto numPagesForOffsets = ColumnChunk::getNumPagesForBuffer();
return numPagesForOffsets + overflowFile->getNumPages();
}
common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

private:
inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + overflowFile->getNumPages();
}

template<typename T>
void templateCopyVarSizedValuesFromString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand Down
35 changes: 31 additions & 4 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "arrow/array/array_nested.h"
#include "storage/copier/column_chunk.h"

using namespace kuzu::common;
Expand All @@ -12,11 +13,13 @@ struct VarListDataColumnChunk {
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;

VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
explicit VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataChunk{std::move(dataChunk)}, numValuesInDataChunk{0},
capacityInDataChunk{StorageConstants::NODE_GROUP_SIZE} {}

void reset();

void resize(uint64_t numValues);
};

class VarListColumnChunk : public ColumnChunk {
Expand All @@ -38,18 +41,42 @@ class VarListColumnChunk : public ColumnChunk {

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

template<typename T>
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
nullChunk->setNull(i + startPosInChunk, listArray->IsNull(i));
auto length = listArray->value_length(i);
varListDataColumnChunk.numValuesInDataChunk += length;
setValue(varListDataColumnChunk.numValuesInDataChunk, i + startPosInChunk);
}
auto startOffset = listArray->value_offset(startPosInChunk);
auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(
listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset);
}

void write(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
VarListDataColumnChunk varListDataColumnChunk;

inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

inline offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}
};

} // namespace storage
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9},
{"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4},
{"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10},
{"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5},
{"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
10 changes: 8 additions & 2 deletions src/include/storage/store/string_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class StringNodeColumn : public NodeColumn {
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

common::page_idx_t append(ColumnChunk* columnChunk, common::page_idx_t startPageIdx,
common::node_group_idx_t nodeGroupIdx) final;

void checkpointInMemory() final;
void rollbackInMemory() final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand All @@ -27,10 +33,10 @@ class StringNodeColumn : public NodeColumn {

private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t chunkStartPageIdx);
common::ValueVector* resultVector, common::node_group_idx_t nodeGroupIdx);

private:
common::page_idx_t ovfPageIdxInChunk;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> overflowMetadataDA;
};

} // namespace storage
Expand Down
14 changes: 6 additions & 8 deletions src/storage/copier/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void StringColumnChunk::resetToEmpty() {

void StringColumnChunk::append(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST);
assert(array->type_id() == arrow::Type::STRING);
switch (array->type_id()) {
case arrow::Type::STRING: {
switch (dataType.getLogicalTypeID()) {
Expand All @@ -76,7 +76,7 @@ void StringColumnChunk::append(

void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherChunk = dynamic_cast<StringColumnChunk*>(other);
auto otherChunk = reinterpret_cast<StringColumnChunk*>(other);
nullChunk->append(
otherChunk->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
switch (dataType.getLogicalTypeID()) {
Expand All @@ -91,21 +91,19 @@ void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk
}
}

page_idx_t StringColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
ColumnChunk::flushBuffer(dataFH, startPageIdx);
startPageIdx += ColumnChunk::getNumPagesForBuffer();
page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
for (auto i = 0u; i < overflowFile->getNumPages(); i++) {
FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data,
BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
startPageIdx++;
}
return getNumPagesForBuffer();
return overflowFile->getNumPages();
}

void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other,
offset_t startPosInOtherChunk, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherKuVals = (ku_string_t*)(other->buffer.get());
auto kuVals = (ku_string_t*)(buffer.get());
auto otherKuVals = reinterpret_cast<ku_string_t*>(other->buffer.get());
auto kuVals = reinterpret_cast<ku_string_t*>(buffer.get());
for (auto i = 0u; i < numValuesToAppend; i++) {
auto posInChunk = i + startPosInChunk;
auto posInOtherChunk = i + startPosInOtherChunk;
Expand Down
67 changes: 35 additions & 32 deletions src/storage/copier/var_list_column_chunk.cpp
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
numValuesInDataChunk = 0;
}

void VarListDataColumnChunk::resize(uint64_t numValues) {
if (numValues <= capacityInDataChunk) {
return;
}
while (capacityInDataChunk < numValues) {
capacityInDataChunk *= VAR_LIST_RESIZE_RATIO;
}
dataChunk->resize(capacityInDataChunk);
}

VarListColumnChunk::VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription)
: ColumnChunk{std::move(dataType), copyDescription, true /* hasNullChunk */},
varListDataColumnChunk{ColumnChunkFactory::createColumnChunk(
Expand All @@ -20,20 +30,42 @@

void VarListColumnChunk::append(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST);
assert(array->type_id() == arrow::Type::STRING || array->type_id() == arrow::Type::LIST ||
array->type_id() == arrow::Type::LARGE_LIST);
switch (array->type_id()) {
case arrow::Type::STRING: {
copyVarListFromArrowString(array, startPosInChunk, numValuesToAppend);
} break;
case arrow::Type::LIST: {
copyVarListFromArrowList(array, startPosInChunk, numValuesToAppend);
copyVarListFromArrowList<arrow::ListArray>(array, startPosInChunk, numValuesToAppend);
} break;
case arrow::Type::LARGE_LIST: {
copyVarListFromArrowList<arrow::LargeListArray>(array, startPosInChunk, numValuesToAppend);

Check warning on line 43 in src/storage/copier/var_list_column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/var_list_column_chunk.cpp#L42-L43

Added lines #L42 - L43 were not covered by tests
} break;
default: {
throw NotImplementedException("ListColumnChunk::appendArray");
}
}
}

void VarListColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
offset_t startPosInChunk, uint32_t numValuesToAppend) {
nullChunk->append(
other->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
auto otherListChunk = reinterpret_cast<VarListColumnChunk*>(other);
auto offsetInDataChunkToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
varListDataColumnChunk.numValuesInDataChunk +=
otherListChunk->getListLen(startPosInOtherChunk + i);
setValue(varListDataColumnChunk.numValuesInDataChunk, startPosInChunk + i);
}
auto startOffset = otherListChunk->getListOffset(startPosInOtherChunk);
auto endOffset = otherListChunk->getListOffset(startPosInOtherChunk + numValuesToAppend);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(otherListChunk->varListDataColumnChunk.dataChunk.get(),
startOffset, offsetInDataChunkToAppend, endOffset - startOffset);
}

void VarListColumnChunk::copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto stringArray = (arrow::StringArray*)array;
Expand Down Expand Up @@ -62,29 +94,10 @@
}
}

void VarListColumnChunk::copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
// auto listArray = (arrow::ListArray*)array;
// auto offsetArray = listArray->offsets()->Slice(1 /* offset */);
// append(offsetArray.get(), startPosInChunk, numValuesToAppend);
// if (offsetArray->data()->MayHaveNulls()) {
// for (auto i = 0u; i < numValuesToAppend; i++) {
// nullChunk->setNull(i + startPosInChunk, offsetArray->data()->IsNull(i));
// }
// } else {
// nullChunk->setRangeNoNull(startPosInChunk, numValuesToAppend);
// }
// auto startOffset = listArray->value_offset(startPosInChunk);
// auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
// varListDataColumnChunk.dataChunk->resize(endOffset - startOffset);
// varListDataColumnChunk.dataChunk->append(
// listArray->offsets().get(), startOffset, endOffset - startOffset);
}

void VarListColumnChunk::write(const Value& listVal, uint64_t posToWrite) {
assert(listVal.getDataType()->getPhysicalType() == PhysicalTypeID::VAR_LIST);
auto numValuesInList = NestedVal::getChildrenSize(&listVal);
resizeDataChunk(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
for (auto i = 0u; i < numValuesInList; i++) {
varListDataColumnChunk.dataChunk->write(
*NestedVal::getChildVal(&listVal, i), varListDataColumnChunk.numValuesInDataChunk);
Expand All @@ -104,15 +117,5 @@
varListDataColumnChunk.reset();
}

void VarListColumnChunk::resizeDataChunk(uint64_t numValues) {
if (numValues <= varListDataColumnChunk.capacityInDataChunk) {
return;
}
while (varListDataColumnChunk.capacityInDataChunk < numValues) {
varListDataColumnChunk.capacityInDataChunk *= 2;
}
varListDataColumnChunk.dataChunk->resize(varListDataColumnChunk.capacityInDataChunk);
}

} // namespace storage
} // namespace kuzu
4 changes: 4 additions & 0 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ std::unique_ptr<MetadataDAHInfo> StorageManager::createMetadataDAHInfo(
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType)));
} break;
case PhysicalTypeID::STRING: {
auto dummyChildType = LogicalType{LogicalTypeID::ANY};
metadataDAHInfo->childrenInfos.push_back(createMetadataDAHInfo(dummyChildType));
} break;
default: {
// DO NOTHING.
}
Expand Down
Loading