Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeGroup list storage refactor #1885

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataset/copy-test/node/csv/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt INT64[], PRIMARY KEY (id));
100,000 changes: 50,000 additions & 50,000 deletions dataset/copy-test/node/csv/types_50k.csv

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dataset/tensor-list/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[][], intTensor INT64[][][], oneDimInt INT64, PRIMARY KEY (ID));
create node table tensor (ID INT64, boolTensor BOOLEAN[], doubleTensor DOUBLE[2][], intTensor INT64[2][][], oneDimInt INT64, PRIMARY KEY (ID));
15 changes: 15 additions & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
return listEntry;
}

void ListAuxiliaryBuffer::resize(uint64_t numValues) {
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
if (numValues <= capacity) {
size = numValues;
return;
}
bool needResizeDataVector = numValues > capacity;
while (numValues > capacity) {
capacity *= 2;
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
}
size = numValues;
}

void ListAuxiliaryBuffer::resizeDataVector(ValueVector* dataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * dataVector->getNumBytesPerValue());
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * dataVector->getNumBytesPerValue());
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
// actual elements of the lists in a flat, continuous storage. Each list would be represented as a
// contiguous subsequence of elements in this vector.
class ListAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ListVector;

public:
ListAuxiliaryBuffer(const LogicalType& dataVectorType, storage::MemoryManager* memoryManager);

Expand All @@ -78,6 +80,8 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

inline void resetSize() { size = 0; }

void resize(uint64_t numValues);

private:
void resizeDataVector(ValueVector* dataVector);

Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void resizeDataVector(ValueVector* vector, uint64_t numValues) {
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resize(numValues);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
Expand Down
14 changes: 13 additions & 1 deletion src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ColumnChunk {
virtual void resetToEmpty();

// Include pages for null and children segments.
common::page_idx_t getNumPages() const;
virtual common::page_idx_t getNumPages() const;

void append(
common::ValueVector* vector, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand All @@ -64,6 +64,12 @@ class ColumnChunk {
common::BufferPoolConstants::PAGE_4KB_SIZE;
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }

virtual void write(const common::Value& val, uint64_t posToWrite);

virtual void resize(uint64_t numValues);

protected:
ColumnChunk(common::LogicalType dataType, common::offset_t numValues,
common::CopyDescription* copyDescription, bool hasNullChunk);
Expand Down Expand Up @@ -109,6 +115,10 @@ class NullColumnChunk : public ColumnChunk {

inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) { ((bool*)buffer.get())[pos] = isNull; }

void resize(uint64_t numValues) final;

void setRangeNoNull(common::offset_t startPosInChunk, uint32_t numValuesToSet);
};

class FixedListColumnChunk : public ColumnChunk {
Expand All @@ -118,6 +128,8 @@ class FixedListColumnChunk : public ColumnChunk {

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void write(const common::Value& fixedListVal, uint64_t posToWrite) final;
};

struct ColumnChunkFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace kuzu {
namespace storage {

class VarSizedColumnChunk : public ColumnChunk {
class StringColumnChunk : public ColumnChunk {
public:
VarSizedColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);

void resetToEmpty() final;
void append(
Expand Down Expand Up @@ -36,35 +36,29 @@ class VarSizedColumnChunk : public ColumnChunk {
template<typename T>
void templateCopyVarSizedValuesFromString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
void copyValuesFromVarList(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendStringColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);
void appendVarListColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void write(const common::Value& val, uint64_t posToWrite) override;

private:
std::unique_ptr<InMemOverflowFile> overflowFile;
PageByteCursor overflowCursor;
};

// BOOL
// BLOB
template<>
void VarSizedColumnChunk::setValueFromString<common::blob_t>(
void StringColumnChunk::setValueFromString<common::blob_t>(
const char* value, uint64_t length, uint64_t pos);
// STRING
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);
// VAR_LIST
template<>
void VarSizedColumnChunk::setValueFromString<common::ku_list_t>(
void StringColumnChunk::setValueFromString<common::ku_string_t>(
const char* value, uint64_t length, uint64_t pos);

// STRING
template<>
std::string VarSizedColumnChunk::getValue<std::string>(common::offset_t pos) const;
std::string StringColumnChunk::getValue<std::string>(common::offset_t pos) const;

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StructColumnChunk : public ColumnChunk {
common::LogicalType& type, const std::string& structString);
static std::string parseStructFieldName(const std::string& structString, uint64_t& curPos);
std::string parseStructFieldValue(const std::string& structString, uint64_t& curPos);
void write(const common::Value& val, uint64_t posToWrite) final;
};

} // namespace storage
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<common::Value> getArrowFixedListVal(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::LogicalType& dataType,
const common::CopyDescription& copyDescription);
Expand Down Expand Up @@ -63,6 +66,8 @@ class TableCopyUtils {
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::LogicalType& type, const common::CopyDescription& copyDescription);
static std::vector<std::string> getColumnNamesToRead(catalog::TableSchema* tableSchema);
static void validateNumElementsInList(
uint64_t numElementsRead, const common::LogicalType& type);
};

} // namespace storage
Expand Down
52 changes: 52 additions & 0 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include "storage/copier/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
namespace storage {

struct VarListDataColumnChunk {
std::unique_ptr<ColumnChunk> dataChunk;
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;

VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataChunk{std::move(dataChunk)}, numValuesInDataChunk{0},
capacityInDataChunk{StorageConstants::NODE_GROUP_SIZE} {}
};

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataChunk.get();
}

void setValueFromString(const char* value, uint64_t length, uint64_t pos);

private:
inline common::page_idx_t getNumPages() const final {
return varListDataColumnChunk.dataChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void write(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
VarListDataColumnChunk varListDataColumnChunk;
};

} // namespace storage
} // namespace kuzu
11 changes: 7 additions & 4 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class NodeColumn {

virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);

Expand All @@ -75,19 +78,19 @@ class NodeColumn {
return metadataDA->getNumElements(transaction->getType());
}

void checkpointInMemory();
void rollbackInMemory();
virtual void checkpointInMemory();
virtual void rollbackInMemory();

protected:
virtual void scanInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
uint64_t numValuesToScan, common::ValueVector* resultVector, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t posInVector);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
namespace kuzu {
namespace storage {

struct VarSizedNodeColumnFunc {
struct StringNodeColumnFunc {
static void writeStringValuesToPage(
uint8_t* frame, uint16_t posInFrame, common::ValueVector* vector, uint32_t posInVector);
};

class VarSizedNodeColumn : public NodeColumn {
class StringNodeColumn : public NodeColumn {
public:
VarSizedNodeColumn(common::LogicalType dataType,
const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);
StringNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -25,9 +28,6 @@ class VarSizedNodeColumn : public NodeColumn {
private:
void readStringValueFromOvf(transaction::Transaction* transaction, common::ku_string_t& kuStr,
common::ValueVector* resultVector, common::page_idx_t chunkStartPageIdx);
void readListValueFromOvf(transaction::Transaction* transaction, common::ku_list_t kuList,
common::ValueVector* resultVector, uint64_t posInVector,
common::page_idx_t chunkStartPageIdx);

private:
common::page_idx_t ovfPageIdxInChunk;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/struct_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class StructNodeColumn : public NodeColumn {
StructNodeColumn(common::LogicalType dataType, const catalog::MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand Down
Loading
Loading