Skip to content

Commit

Permalink
Struct valueVector refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed May 1, 2023
1 parent ecb5d46 commit 1c35db3
Show file tree
Hide file tree
Showing 28 changed files with 724 additions and 421 deletions.
11 changes: 10 additions & 1 deletion src/common/null_mask.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "common/null_mask.h"

namespace kuzu {
#include <cstring>

namespace kuzu {
namespace common {

void NullMask::setNull(uint32_t pos, bool isNull) {
Expand Down Expand Up @@ -78,5 +79,13 @@ bool NullMask::copyNullMask(const uint64_t* srcNullEntries, uint64_t srcOffset,
return hasNullInSrcNullMask;
}

void NullMask::resize(uint64_t capacity) {
auto resizedBuffer = std::make_unique<uint64_t[]>(capacity);
memcpy(resizedBuffer.get(), buffer.get(), numNullEntries);
buffer = std::move(resizedBuffer);
data = buffer.get();
numNullEntries = capacity;
}

} // namespace common
} // namespace kuzu
24 changes: 21 additions & 3 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,27 @@ namespace common {

ValueVector::ValueVector(DataType dataType, storage::MemoryManager* memoryManager)
: dataType{std::move(dataType)} {
valueBuffer = std::make_unique<uint8_t[]>(
Types::getDataTypeSize(this->dataType) * DEFAULT_VECTOR_CAPACITY);
numBytesPerValue = this->dataType.typeID == VAR_LIST ? sizeof(common::ListEntry) :
Types::getDataTypeSize(this->dataType);
valueBuffer = std::make_unique<uint8_t[]>(numBytesPerValue * DEFAULT_VECTOR_CAPACITY);
if (needOverflowBuffer()) {
assert(memoryManager != nullptr);
inMemOverflowBuffer = std::make_unique<InMemOverflowBuffer>(memoryManager);
}
nullMask = std::make_unique<NullMask>();
numBytesPerValue = Types::getDataTypeSize(this->dataType);
if (this->dataType.typeID == VAR_LIST) {
dataVector = std::make_shared<ValueVector>(*this->dataType.getChildType(), memoryManager);
}
}

void ValueVector::allocateSpaceIfNecessary(uint64_t numElementsToAppend) {
while (size + numElementsToAppend > capacity) {
capacity *= 2;
auto buffer = std::make_unique<uint8_t[]>(capacity * numBytesPerValue);
memcpy(valueBuffer.get(), buffer.get(), size * numBytesPerValue);
valueBuffer = std::move(buffer);
nullMask->resize(capacity);
}
}

void ValueVector::addString(uint32_t pos, char* value, uint64_t len) const {
Expand Down Expand Up @@ -51,6 +64,11 @@ void ValueVector::setValue(uint32_t pos, T val) {
((T*)valueBuffer.get())[pos] = val;
}

template<>
void ValueVector::setValue(uint32_t pos, ListEntry val) {
((ListEntry*)valueBuffer.get())[pos] = val;
}

template<>
void ValueVector::setValue(uint32_t pos, std::string val) {
addString(pos, val.data(), val.length());
Expand Down
114 changes: 111 additions & 3 deletions src/common/vector/value_vector_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeIntoPos(
copyNonNullDataWithSameTypeIntoPos(*childVector, pos, srcData);
srcData += childVector->getNumBytesPerValue();
}
} else if (resultVector.dataType.typeID == VAR_LIST) {
copyKuListToVector(resultVector, pos, *reinterpret_cast<const ku_list_t*>(srcData));
} else {
copyNonNullDataWithSameType(resultVector.dataType, srcData,
resultVector.getData() + pos * resultVector.getNumBytesPerValue(),
Expand All @@ -26,23 +28,129 @@ void ValueVectorUtils::copyNonNullDataWithSameTypeOutFromPos(const ValueVector&
copyNonNullDataWithSameTypeOutFromPos(*childVector, pos, dstData, dstOverflowBuffer);
dstData += childVector->getNumBytesPerValue();
}
} else if (srcVector.dataType.typeID == VAR_LIST) {
auto kuList = ValueVectorUtils::convertListEntryToKuList(srcVector, pos, dstOverflowBuffer);
memcpy(dstData, &kuList, sizeof(kuList));
} else {
copyNonNullDataWithSameType(srcVector.dataType,
srcVector.getData() + pos * srcVector.getNumBytesPerValue(), dstData,
dstOverflowBuffer);
}
}

void ValueVectorUtils::appendElementToList(
ValueVector& listVector, ValueVector& elementVector, uint64_t elementPos) {
auto listDataVector = listVector.getDataVector();
assert(listDataVector->dataType == elementVector.dataType);
if (elementVector.dataType.typeID == VAR_LIST) {
auto listToAppend = elementVector.getValue<common::ListEntry>(elementPos);
auto elementDataVector = elementVector.getDataVector();
auto subListDataVector = listDataVector->getDataVector();
listDataVector->setValue<ListEntry>(
listDataVector->getSize(), ListEntry{subListDataVector->getSize(), listToAppend.size});
subListDataVector->allocateSpaceIfNecessary(listToAppend.size);
for (auto i = 0u; i < listToAppend.size; i++) {
appendElementToList(
*listDataVector, *elementDataVector, listToAppend.offsetInDataVector + i);
subListDataVector->incrementSize(1 /* numElements */);
}
} else {
copyNonNullDataWithSameType(listDataVector->dataType,
elementVector.getData() + elementVector.getNumBytesPerValue() * elementPos,
listDataVector->getData() +
listDataVector->getNumBytesPerValue() * listDataVector->getSize(),
listDataVector->getOverflowBuffer());
}
}

void ValueVectorUtils::copyElementOutFromListVector(ValueVector& listVector,
uint64_t elementPosInListToCopy, ValueVector& elementVector, uint64_t elementPos) {
auto listDataVector = listVector.getDataVector();
assert(listDataVector->dataType == elementVector.dataType);
if (listDataVector->dataType.typeID == VAR_LIST) {
auto listToCopy = listDataVector->getValue<common::ListEntry>(elementPosInListToCopy);
auto elementDataVector = elementVector.getDataVector();
elementVector.setValue<ListEntry>(
elementPos, ListEntry{elementDataVector->getSize(), listToCopy.size});
elementDataVector->allocateSpaceIfNecessary(listToCopy.size);
for (auto i = 0u; i < listToCopy.size; i++) {
copyElementOutFromListVector(*listDataVector, listToCopy.offsetInDataVector + i,
*elementDataVector, elementDataVector->getSize());
elementDataVector->incrementSize(1 /* numElements */);
}
} else {
copyNonNullDataWithSameType(listDataVector->dataType,
listDataVector->getData() +
listDataVector->getNumBytesPerValue() * elementPosInListToCopy,
elementVector.getData() + elementVector.getNumBytesPerValue() * elementPos,
elementVector.getOverflowBuffer());
}
}

void ValueVectorUtils::copyNonNullDataWithSameType(const DataType& dataType, const uint8_t* srcData,
uint8_t* dstData, InMemOverflowBuffer& inMemOverflowBuffer) {
assert(dataType.typeID != STRUCT);
if (dataType.typeID == STRING) {
InMemOverflowBufferUtils::copyString(
*(ku_string_t*)srcData, *(ku_string_t*)dstData, inMemOverflowBuffer);
} else if (dataType.typeID == VAR_LIST) {
InMemOverflowBufferUtils::copyListRecursiveIfNested(
*(ku_list_t*)srcData, *(ku_list_t*)dstData, dataType, inMemOverflowBuffer);
} else {
memcpy(dstData, srcData, Types::getDataTypeSize(dataType));
}
}

ku_list_t ValueVectorUtils::convertListEntryToKuList(
const ValueVector& srcVector, uint64_t pos, InMemOverflowBuffer& dstOverflowBuffer) {
auto listEntry = srcVector.getValue<ListEntry>(pos);
auto& dataVector = *srcVector.getDataVector();
auto listData =
dataVector.getData() + dataVector.getNumBytesPerValue() * listEntry.offsetInDataVector;
ku_list_t dstList;
dstList.size = listEntry.size;
InMemOverflowBufferUtils::allocateSpaceForList(dstList,
Types::getDataTypeSize(*srcVector.dataType.getChildType()) * dstList.size,
dstOverflowBuffer);
if (srcVector.dataType.getChildType()->typeID == VAR_LIST) {
for (auto i = 0u; i < dstList.size; i++) {
auto kuList = convertListEntryToKuList(
dataVector, listEntry.offsetInDataVector + i, dstOverflowBuffer);
(reinterpret_cast<ku_list_t*>(dstList.overflowPtr))[i] = kuList;
}
} else {
memcpy(reinterpret_cast<uint8_t*>(dstList.overflowPtr), listData,
dataVector.getNumBytesPerValue() * listEntry.size);
if (dataVector.dataType.getTypeID() == STRING) {
for (auto i = 0u; i < dstList.size; i++) {
InMemOverflowBufferUtils::copyString((reinterpret_cast<ku_string_t*>(listData))[i],
(reinterpret_cast<ku_string_t*>(dstList.overflowPtr))[i], dstOverflowBuffer);
}
}
}
return dstList;
}

void ValueVectorUtils::copyKuListToVector(
ValueVector& dstVector, uint64_t pos, const ku_list_t& srcList) {
auto srcListData = reinterpret_cast<uint8_t*>(srcList.overflowPtr);
auto dataVector = dstVector.getDataVector();
dataVector->allocateSpaceIfNecessary(srcList.size);
dstVector.setValue<ListEntry>(pos, ListEntry{dataVector->getSize(), srcList.size});
if (dstVector.dataType.getChildType()->typeID == VAR_LIST) {
for (auto i = 0u; i < srcList.size; i++) {
ValueVectorUtils::copyKuListToVector(*dataVector, dataVector->getSize() + i,
reinterpret_cast<ku_list_t*>(srcList.overflowPtr)[i]);
}
} else {
auto dstListData =
dataVector->getData() + dataVector->getSize() * dataVector->getNumBytesPerValue();
memcpy(dstListData, srcListData, srcList.size * dataVector->getNumBytesPerValue());
if (dataVector->dataType.getTypeID() == STRING) {
for (auto i = 0u; i < srcList.size; i++) {
InMemOverflowBufferUtils::copyString(
(reinterpret_cast<ku_string_t*>(srcListData))[i],
(reinterpret_cast<ku_string_t*>(dstListData))[i],
dataVector->getOverflowBuffer());
}
}
}
dataVector->incrementSize(srcList.size);
}
18 changes: 9 additions & 9 deletions src/function/vector_list_operation.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "common/types/ku_list.h"
#include "common/vector/value_vector_utils.h"
#include "function/list/operations/list_append_operation.h"
#include "function/list/operations/list_concat_operation.h"
#include "function/list/operations/list_contains.h"
Expand Down Expand Up @@ -28,21 +29,20 @@ void ListCreationVectorOperation::execFunc(
auto& childType = parameters[0]->dataType;
auto numBytesOfListElement = Types::getDataTypeSize(childType);
auto elements = std::make_unique<uint8_t[]>(parameters.size() * numBytesOfListElement);
auto dataVector = result.getDataVector();
for (auto selectedPos = 0u; selectedPos < result.state->selVector->selectedSize;
++selectedPos) {
auto pos = result.state->selVector->selectedPositions[selectedPos];
auto& kuList = ((ku_list_t*)result.getData())[pos];
result.setValue(pos, common::ListEntry{dataVector->getSize(), parameters.size()});
dataVector->allocateSpaceIfNecessary(parameters.size());
for (auto paramIdx = 0u; paramIdx < parameters.size(); paramIdx++) {
auto paramPos = parameters[paramIdx]->state->isFlat() ?
parameters[paramIdx]->state->selVector->selectedPositions[0] :
auto parameter = parameters[paramIdx].get();
auto paramPos = parameter->state->isFlat() ?
parameter->state->selVector->selectedPositions[0] :
pos;
memcpy(elements.get() + paramIdx * numBytesOfListElement,
parameters[paramIdx]->getData() + paramPos * numBytesOfListElement,
numBytesOfListElement);
common::ValueVectorUtils::appendElementToList(result, *parameter, paramPos);
dataVector->incrementSize(1 /* numElements */);
}
ku_list_t tmpList(parameters.size(), (uint64_t)elements.get());
InMemOverflowBufferUtils::copyListRecursiveIfNested(
tmpList, kuList, result.dataType, result.getOverflowBuffer());
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/include/common/null_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class NullMask {
static bool copyNullMask(const uint64_t* srcNullEntries, uint64_t srcOffset,
uint64_t* dstNullEntries, uint64_t dstOffset, uint64_t numBitsToCopy);

void resize(uint64_t capacity);

private:
static inline std::pair<uint64_t, uint64_t> getNullEntryAndBitPos(uint64_t pos) {
auto nullEntryPos = pos >> NUM_BITS_PER_NULL_ENTRY_LOG2;
Expand Down
28 changes: 25 additions & 3 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
namespace kuzu {
namespace common {

struct ListEntry {
ListEntry() : offsetInDataVector{0}, size{0} {};
ListEntry(common::offset_t offsetInDataVector, uint64_t size)
: offsetInDataVector{offsetInDataVector}, size{size} {}
common::offset_t offsetInDataVector;
uint64_t size;
};

//! A Vector represents values of the same data type.
//! The capacity of a ValueVector is either 1 (sequence) or DEFAULT_VECTOR_CAPACITY.
class ValueVector {
Expand Down Expand Up @@ -48,7 +56,6 @@ class ValueVector {
}
template<typename T>
void setValue(uint32_t pos, T val);

void addValue(uint32_t pos, const Value& value);

inline uint8_t* getData() const { return valueBuffer.get(); }
Expand Down Expand Up @@ -77,6 +84,14 @@ class ValueVector {
inline std::shared_ptr<ValueVector> getChildVector(vector_idx_t idx) const {
return childrenVectors[idx];
}
inline ValueVector* getDataVector() const {
assert(dataType.typeID == VAR_LIST);
return dataVector.get();
}
inline uint64_t getSize() const { return size; }
inline void incrementSize(uint64_t numElements) { size += numElements; }

void allocateSpaceIfNecessary(uint64_t numElementsToAppend);

private:
inline bool needOverflowBuffer() const {
Expand All @@ -93,11 +108,18 @@ class ValueVector {

private:
bool _isSequential = false;
std::unique_ptr<InMemOverflowBuffer> inMemOverflowBuffer;
std::unique_ptr<uint8_t[]> valueBuffer;
std::unique_ptr<NullMask> nullMask;
std::vector<std::shared_ptr<ValueVector>> childrenVectors;
uint32_t numBytesPerValue;
// TODO(Ziyi): Wrap those auxiliary information into a struct.
// For string vector
std::unique_ptr<InMemOverflowBuffer> inMemOverflowBuffer;
// For struct vector
std::vector<std::shared_ptr<ValueVector>> childrenVectors;
// For list vector
uint64_t capacity = common::DEFAULT_VECTOR_CAPACITY;
uint64_t size = 0;
std::shared_ptr<ValueVector> dataVector;
};

class NodeIDVector {
Expand Down
7 changes: 7 additions & 0 deletions src/include/common/vector/value_vector_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ class ValueVectorUtils {
ValueVector& resultVector, uint64_t pos, const uint8_t* srcData);
static void copyNonNullDataWithSameTypeOutFromPos(const ValueVector& srcVector, uint64_t pos,
uint8_t* dstData, InMemOverflowBuffer& dstOverflowBuffer);
static void appendElementToList(
ValueVector& listDataVector, ValueVector& elementVector, uint64_t elementPos);
static void copyElementOutFromListVector(ValueVector& listDataVector,
uint64_t elementPosInListToCopy, ValueVector& elementVector, uint64_t elementPos);

private:
static void copyNonNullDataWithSameType(const DataType& dataType, const uint8_t* srcData,
uint8_t* dstData, InMemOverflowBuffer& inMemOverflowBuffer);
static ku_list_t convertListEntryToKuList(
const ValueVector& srcVector, uint64_t pos, InMemOverflowBuffer& dstOverflowBuffer);
static void copyKuListToVector(ValueVector& dstVector, uint64_t pos, const ku_list_t& srcList);
};

} // namespace common
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/unwind.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Unwind : public PhysicalOperator {
std::unique_ptr<evaluator::BaseExpressionEvaluator> expressionEvaluator;
std::shared_ptr<common::ValueVector> outValueVector;
uint32_t startIndex;
common::ku_list_t inputList;
common::ListEntry listEntry;
};

} // namespace processor
Expand Down
32 changes: 14 additions & 18 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,20 @@ class ListPropertyColumn : public PropertyColumnWithOverflow {
common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
Column::lookup(transaction, resultVector, vectorPos, cursor);
if (!resultVector->isNull(vectorPos)) {
diskOverflowFile.scanSingleListOverflow(
transaction->getType(), *resultVector, vectorPos);
}
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.readListsToVector(transaction->getType(), *resultVector);
}
inline void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.readListsToVector(transaction->getType(), *resultVector);
}
// TODO(Ziyi): remove these function once we have changed the storage structure for lists.
void readListsToVector(PageElementCursor& cursor,
const std::function<common::page_idx_t(common::page_idx_t)>& logicalToPhysicalPageMapper,
transaction::Transaction* transaction, common::ValueVector* resultVector,
uint64_t vectorPos, uint64_t numValuesToRead);

void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override;

void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;

void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;
};

class StructPropertyColumn : public Column {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class DiskOverflowFile : public StorageStructure {
}

void readListsToVector(transaction::TransactionType trxType, common::ValueVector& valueVector);

void readListToVector(transaction::TransactionType trxType, common::ku_list_t& kuList,
common::ValueVector* vector, uint64_t pos);
std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str);
std::vector<std::unique_ptr<common::Value>> readList(transaction::TransactionType trxType,
const common::ku_list_t& listVal, const common::DataType& dataType);
Expand Down
Loading

0 comments on commit 1c35db3

Please sign in to comment.