Skip to content

Commit

Permalink
Merge pull request #1885 from kuzudb/node-group-list
Browse files Browse the repository at this point in the history
NodeGroup list storage refactor
  • Loading branch information
acquamarin committed Aug 3, 2023
2 parents bb4f187 + 2925928 commit 8a49c40
Show file tree
Hide file tree
Showing 31 changed files with 50,883 additions and 50,334 deletions.
2 changes: 1 addition & 1 deletion dataset/copy-test/node/csv/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt INT64[], PRIMARY KEY (id));
100,000 changes: 50,000 additions & 50,000 deletions dataset/copy-test/node/csv/types_50k.csv

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dataset/tensor-list/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[][], intTensor INT64[][][], oneDimInt INT64, PRIMARY KEY (ID));
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[2][], intTensor INT64[2][][], oneDimInt INT64, PRIMARY KEY (ID));
15 changes: 15 additions & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
return listEntry;
}

void ListAuxiliaryBuffer::resize(uint64_t numValues) {
if (numValues <= capacity) {
size = numValues;
return;
}
bool needResizeDataVector = numValues > capacity;
while (numValues > capacity) {
capacity *= 2;
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
}
size = numValues;
}

void ListAuxiliaryBuffer::resizeDataVector(ValueVector* dataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * dataVector->getNumBytesPerValue());
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * dataVector->getNumBytesPerValue());
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
// actual elements of the lists in a flat, continuous storage. Each list would be represented as a
// contiguous subsequence of elements in this vector.
class ListAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ListVector;

public:
ListAuxiliaryBuffer(const LogicalType& dataVectorType, storage::MemoryManager* memoryManager);

Expand All @@ -78,6 +80,8 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

inline void resetSize() { size = 0; }

void resize(uint64_t numValues);

private:
void resizeDataVector(ValueVector* dataVector);

Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void resizeDataVector(ValueVector* vector, uint64_t numValues) {
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resize(numValues);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
Expand Down
14 changes: 13 additions & 1 deletion src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ColumnChunk {
virtual void resetToEmpty();

// Include pages for null and children segments.
common::page_idx_t getNumPages() const;
virtual common::page_idx_t getNumPages() const;

void append(
common::ValueVector* vector, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand All @@ -64,6 +64,12 @@ class ColumnChunk {
common::BufferPoolConstants::PAGE_4KB_SIZE;
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }

virtual void write(const common::Value& val, uint64_t posToWrite);

virtual void resize(uint64_t numValues);

protected:
ColumnChunk(common::LogicalType dataType, common::offset_t numValues,
common::CopyDescription* copyDescription, bool hasNullChunk);
Expand Down Expand Up @@ -109,6 +115,10 @@ class NullColumnChunk : public ColumnChunk {

inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) { ((bool*)buffer.get())[pos] = isNull; }

void resize(uint64_t numValues) final;

void setRangeNoNull(common::offset_t startPosInChunk, uint32_t numValuesToSet);
};

class FixedListColumnChunk : public ColumnChunk {
Expand All @@ -118,6 +128,8 @@ class FixedListColumnChunk : public ColumnChunk {

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void write(const common::Value& fixedListVal, uint64_t posToWrite) final;
};

struct ColumnChunkFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace kuzu {
namespace storage {

class VarSizedColumnChunk : public ColumnChunk {
class StringColumnChunk : public ColumnChunk {
public:
VarSizedColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);

void resetToEmpty() final;
void append(
Expand Down Expand Up @@ -36,35 +36,29 @@ class VarSizedColumnChunk : public ColumnChunk {
template<typename T>
void templateCopyVarSizedValuesFromString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
void copyValuesFromVarList(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendStringColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);
void appendVarListColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void write(const common::Value& val, uint64_t posToWrite) override;

private:
std::unique_ptr<InMemOverflowFile> overflowFile;
PageByteCursor overflowCursor;
};

// BOOL
// BLOB
template<>
void VarSizedColumnChunk::setValueFromString<common::blob_t>(
void StringColumnChunk::setValueFromString<common::blob_t>(
const char* value, uint64_t length, uint64_t pos);
// STRING
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);
// VAR_LIST
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_list_t>(
void StringColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);

// STRING
template<>
std::string VarSizedColumnChunk::getValue<std::string>(common::offset_t pos) const;
std::string StringColumnChunk::getValue<std::string>(common::offset_t pos) const;

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StructColumnChunk : public ColumnChunk {
common::LogicalType& type, const std::string& structString);
static std::string parseStructFieldName(const std::string& structString, uint64_t& curPos);
std::string parseStructFieldValue(const std::string& structString, uint64_t& curPos);
void write(const common::Value& val, uint64_t posToWrite) final;
};

} // namespace storage
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<common::Value> getArrowFixedListVal(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
Expand Down Expand Up @@ -63,6 +66,8 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::LogicalType& type, const common::CopyDescription& copyDescription);
static std::vector<std::string> getColumnNamesToRead(catalog::TableSchema* tableSchema);
static void validateNumElementsInList(
uint64_t numElementsRead, const common::LogicalType& type);
};

} // namespace storage
Expand Down
52 changes: 52 additions & 0 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include "storage/copier/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
namespace storage {

struct VarListDataColumnChunk {
std::unique_ptr<ColumnChunk> dataChunk;
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;

VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataChunk{std::move(dataChunk)}, numValuesInDataChunk{0},
capacityInDataChunk{StorageConstants::NODE_GROUP_SIZE} {}
};

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataChunk.get();
}

void setValueFromString(const char* value, uint64_t length, uint64_t pos);

private:
inline common::page_idx_t getNumPages() const final {
return varListDataColumnChunk.dataChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void write(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
VarListDataColumnChunk varListDataColumnChunk;
};

} // namespace storage
} // namespace kuzu
11 changes: 7 additions & 4 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class NodeColumn {

virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);

Expand All @@ -75,19 +78,19 @@ class NodeColumn {
return metadataDA->getNumElements(transaction->getType());
}

void checkpointInMemory();
void rollbackInMemory();
virtual void checkpointInMemory();
virtual void rollbackInMemory();

protected:
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
namespace kuzu {
namespace storage {

struct VarSizedNodeColumnFunc {
struct StringNodeColumnFunc {
static void writeStringValuesToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector);
};

class VarSizedNodeColumn : public NodeColumn {
class StringNodeColumn : public NodeColumn {
public:
VarSizedNodeColumn(common::LogicalType dataType,
const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);
StringNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -25,9 +28,6 @@ class VarSizedNodeColumn : public NodeColumn {
private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t chunkStartPageIdx);
void readListValueFromOvf(transaction::Transaction* transaction, common::ku_list_t kuList,
common::ValueVector* resultVector, uint64_t posInVector,
common::page_idx_t chunkStartPageIdx);

private:
common::page_idx_t ovfPageIdxInChunk;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class StructNodeColumn : public NodeColumn {
StructNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand Down
Loading

0 comments on commit 8a49c40

Please sign in to comment.