Skip to content

Commit

Permalink
NodeGroup list storage refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 3, 2023
1 parent baf9e56 commit 072b545
Show file tree
Hide file tree
Showing 31 changed files with 50,848 additions and 50,278 deletions.
2 changes: 1 addition & 1 deletion dataset/copy-test/node/csv/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt INT64[], PRIMARY KEY (id));
100,000 changes: 50,000 additions & 50,000 deletions dataset/copy-test/node/csv/types_50k.csv

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dataset/tensor-list/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[][], intTensor INT64[][][], oneDimInt INT64, PRIMARY KEY (ID));
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[2][], intTensor INT64[2][][], oneDimInt INT64, PRIMARY KEY (ID));
15 changes: 15 additions & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
return listEntry;
}

void ListAuxiliaryBuffer::resize(uint64_t numValues) {
if (numValues <= capacity) {
size = numValues;
return;
}
bool needResizeDataVector = numValues > capacity;
while (numValues > capacity) {
capacity *= 2;
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
}
size = numValues;
}

void ListAuxiliaryBuffer::resizeDataVector(ValueVector* dataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * dataVector->getNumBytesPerValue());
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * dataVector->getNumBytesPerValue());
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
// actual elements of the lists in a flat, continuous storage. Each list would be represented as a
// contiguous subsequence of elements in this vector.
class ListAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ListVector;

public:
ListAuxiliaryBuffer(const LogicalType& dataVectorType, storage::MemoryManager* memoryManager);

Expand All @@ -78,6 +80,8 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

inline void resetSize() { size = 0; }

void resize(uint64_t numValues);

private:
void resizeDataVector(ValueVector* dataVector);

Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void resizeDataVector(ValueVector* vector, uint64_t numValues) {
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resize(numValues);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
Expand Down
12 changes: 11 additions & 1 deletion src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ColumnChunk {
virtual void resetToEmpty();

// Include pages for null and children segments.
common::page_idx_t getNumPages() const;
virtual common::page_idx_t getNumPages() const;

void append(
common::ValueVector* vector, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand All @@ -64,6 +64,12 @@ class ColumnChunk {
common::BufferPoolConstants::PAGE_4KB_SIZE;
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }

virtual void writeVal(const common::Value& val, uint64_t posToWrite);

virtual void resize(uint64_t numValues);

protected:
ColumnChunk(common::LogicalType dataType, common::offset_t numValues,
common::CopyDescription* copyDescription, bool hasNullChunk);
Expand Down Expand Up @@ -109,6 +115,8 @@ class NullColumnChunk : public ColumnChunk {

inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) { ((bool*)buffer.get())[pos] = isNull; }

void resize(uint64_t numValues) final;
};

class FixedListColumnChunk : public ColumnChunk {
Expand All @@ -118,6 +126,8 @@ class FixedListColumnChunk : public ColumnChunk {

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void writeVal(const common::Value& fixedListVal, uint64_t posToWrite) final;
};

struct ColumnChunkFactory {
Expand Down
42 changes: 42 additions & 0 deletions src/include/storage/copier/list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include "storage/copier/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
namespace storage {

class ListColumnChunk : public ColumnChunk {
public:
ListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);

inline ColumnChunk* getDataColumnChunk() const { return dataChunk.get(); }

void setValueFromString(const char* value, uint64_t length, uint64_t pos);

private:
inline common::page_idx_t getNumPages() const final {
return dataChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void copyListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void copyListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void writeVal(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
std::unique_ptr<ColumnChunk> dataChunk;
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;
};

} // namespace storage
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace kuzu {
namespace storage {

class VarSizedColumnChunk : public ColumnChunk {
class StringColumnChunk : public ColumnChunk {
public:
VarSizedColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);

void resetToEmpty() final;
void append(
Expand Down Expand Up @@ -39,32 +39,30 @@ class VarSizedColumnChunk : public ColumnChunk {
void copyValuesFromVarList(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendStringColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);
void appendVarListColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
void appendVarListColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void writeVal(const common::Value& val, uint64_t posToWrite) override;

private:
std::unique_ptr<InMemOverflowFile> overflowFile;
PageByteCursor overflowCursor;
};

// BOOL
// BLOB
template<>
void VarSizedColumnChunk::setValueFromString<common::blob_t>(
void StringColumnChunk::setValueFromString<common::blob_t>(
const char* value, uint64_t length, uint64_t pos);
// STRING
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);
// VAR_LIST
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_list_t>(
void StringColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);

// STRING
template<>
std::string VarSizedColumnChunk::getValue<std::string>(common::offset_t pos) const;
std::string StringColumnChunk::getValue<std::string>(common::offset_t pos) const;

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StructColumnChunk : public ColumnChunk {
common::LogicalType& type, const std::string& structString);
static std::string parseStructFieldName(const std::string& structString, uint64_t& curPos);
std::string parseStructFieldValue(const std::string& structString, uint64_t& curPos);
void writeVal(const common::Value& val, uint64_t posToWrite) final;
};

} // namespace storage
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<common::Value> getArrowFixedListVal(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
Expand Down Expand Up @@ -63,6 +66,8 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::LogicalType& type, const common::CopyDescription& copyDescription);
static std::vector<std::string> getColumnNamesToRead(catalog::TableSchema* tableSchema);
static void validateNumElementsInList(
uint64_t numElementsRead, const common::LogicalType& type);
};

} // namespace storage
Expand Down
72 changes: 72 additions & 0 deletions src/include/storage/store/list_node_column.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include "node_column.h"

namespace kuzu {
namespace storage {

class ListNodeColumn : public NodeColumn {
public:
ListNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal)
: NodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
true /* requireNullColumn */} {
dataNodeColumn =
NodeColumnFactory::createNodeColumn(*common::VarListType::getChildType(&this->dataType),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal);
}

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;

void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector) final;

common::page_idx_t append(
ColumnChunk* columnChunk, common::page_idx_t startPageIdx, uint64_t nodeGroupIdx) override;

private:
void scanUnfiltered(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* offsetVector, common::ValueVector* resultVector);

void scanFiltered(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* offsetVector, common::ValueVector* resultVector);

void checkpointInMemory() final;

void rollbackInMemory() final;

void scanWithOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t posToWriteListEntry) final;

common::offset_t readOffset(transaction::Transaction* transaction, common::offset_t valuePos);

void scanListOffset(transaction::Transaction* transaction, common::offset_t startNodeOffset,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* offsetVector);

inline common::offset_t readListOffsetInStorage(
transaction::Transaction* transaction, common::offset_t nodeOffset) {
return nodeOffset == 0 ? 0 : readOffset(transaction, nodeOffset - 1);
}

inline common::offset_t getListOffsetInStorage(transaction::Transaction* transaction,
common::ValueVector* offsetVector, common::offset_t nodeOffset, uint64_t nodePos) {
return nodePos == 0 ? readListOffsetInStorage(transaction, nodeOffset) :
offsetVector->getValue<common::offset_t>(nodePos - 1);
}

inline uint64_t getListLength(transaction::Transaction* transaction,
common::ValueVector* offsetVector, common::offset_t nodeOffset, uint64_t nodePos) {
return getListOffsetInStorage(transaction, offsetVector, nodeOffset + 1, nodePos + 1) -
getListOffsetInStorage(transaction, offsetVector, nodeOffset, nodePos);
}

private:
std::unique_ptr<NodeColumn> dataNodeColumn;
};

} // namespace storage
} // namespace kuzu
13 changes: 9 additions & 4 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,24 @@ class NodeColumn {
return metadataDA->getNumElements(transaction->getType());
}

void checkpointInMemory();
void rollbackInMemory();
virtual void checkpointInMemory();
virtual void rollbackInMemory();

virtual void scanWithOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t posToWriteListEntry = 0);

protected:
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
namespace kuzu {
namespace storage {

struct VarSizedNodeColumnFunc {
struct StringNodeColumnFunc {
static void writeStringValuesToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector);
};

class VarSizedNodeColumn : public NodeColumn {
class StringNodeColumn : public NodeColumn {
public:
VarSizedNodeColumn(common::LogicalType dataType,
const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);
StringNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -25,9 +24,10 @@ class VarSizedNodeColumn : public NodeColumn {
private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t chunkStartPageIdx);
void readListValueFromOvf(transaction::Transaction* transaction, common::ku_list_t kuList,
common::ValueVector* resultVector, uint64_t posInVector,
common::page_idx_t chunkStartPageIdx);
void scanWithOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t posToWriteListEntry) final;

private:
common::page_idx_t ovfPageIdxInChunk;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class StructNodeColumn : public NodeColumn {
common::ValueVector* resultVector) final;
void lookupInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void scanWithOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t posToWriteListEntry) final;
};

} // namespace storage
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "processor/operator/copy/copy_node.h"

#include "common/string_utils.h"
#include "storage/copier/var_sized_column_chunk.h"
#include "storage/copier/string_column_chunk.h"

using namespace kuzu::catalog;
using namespace kuzu::common;
Expand Down Expand Up @@ -186,7 +186,7 @@ void CopyNode::appendToPKIndex(
}
} break;
case LogicalTypeID::STRING: {
auto varSizedChunk = (VarSizedColumnChunk*)chunk;
auto varSizedChunk = (StringColumnChunk*)chunk;
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
auto value = varSizedChunk->getValue<std::string>(i);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/copier/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ add_library(kuzu_storage_in_mem_csv_copier
rel_copy_executor.cpp
struct_column_chunk.cpp
table_copy_utils.cpp
var_sized_column_chunk.cpp)
string_column_chunk.cpp
list_column_chunk.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_storage_in_mem_csv_copier>
Expand Down
Loading

0 comments on commit 072b545

Please sign in to comment.