Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic framework of local storage for node group #1928

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dataset/copy-fault-tests/duplicate-ids/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
create node table person (ID INT64, fName STRING, PRIMARY KEY (ID));
create node table person (ID INT64, fName STRING, PRIMARY KEY (ID));
create node table org (ID STRING, fName STRING, PRIMARY KEY (ID));
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/duplicate-ids/vOrg.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
10,Guodong
24,Semih
31,Xiyang
10,Ziyi
5 changes: 5 additions & 0 deletions src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ std::string ExceptionMessage::invalidPKType(const std::string& type) {
type);
}

std::string ExceptionMessage::overLargeStringValueException(const std::string& length) {
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
return StringUtils::string_format(
"Maximum length of strings is 4096. Input string's length is {}.", length);
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct ExceptionMessage {
static inline std::string notAllowCopyOnNonEmptyTableException() {
return "COPY commands can only be executed once on a table.";
}
static std::string overLargeStringValueException(const std::string& length);
};

class Exception : public std::exception {
Expand Down
6 changes: 3 additions & 3 deletions src/include/common/types/ku_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace common {

struct ku_string_t {

static const uint64_t PREFIX_LENGTH = 4;
static const uint64_t INLINED_SUFFIX_LENGTH = 8;
static const uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH;
static constexpr uint64_t PREFIX_LENGTH = 4;
static constexpr uint64_t INLINED_SUFFIX_LENGTH = 8;
static constexpr uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH;

uint32_t len;
uint8_t prefix[PREFIX_LENGTH];
Expand Down
1 change: 1 addition & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ValueVector {

inline uint32_t getNumBytesPerValue() const { return numBytesPerValue; }

// TODO(Guodong): Rename this to getValueRef
template<typename T>
inline T& getValue(uint32_t pos) const {
return ((T*)valueBuffer.get())[pos];
Expand Down
4 changes: 2 additions & 2 deletions src/include/main/storage_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class StorageDriver {
uint64_t getNumRels(const std::string& relName);

private:
void scanColumn(
storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result);
void scanColumn(transaction::Transaction* transaction, storage::NodeColumn* column,
common::offset_t* offsets, size_t size, uint8_t* result);

private:
catalog::Catalog* catalog;
Expand Down
30 changes: 17 additions & 13 deletions src/include/processor/operator/persistent/set_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class NodeSetExecutor {

void init(ResultSet* resultSet, ExecutionContext* context);

virtual void set() = 0;
virtual void set(ExecutionContext* context) = 0;

virtual std::unique_ptr<NodeSetExecutor> copy() const = 0;

Expand All @@ -37,46 +37,50 @@ class NodeSetExecutor {
common::ValueVector* rhsVector = nullptr;
};

struct NodeSetInfo {
storage::NodeTable* table;
common::property_id_t propertyID;
};

class SingleLabelNodeSetExecutor : public NodeSetExecutor {
public:
SingleLabelNodeSetExecutor(storage::NodeColumn* column, const DataPos& nodeIDPos,
SingleLabelNodeSetExecutor(NodeSetInfo setInfo, const DataPos& nodeIDPos,
const DataPos& lhsVectorPos, std::unique_ptr<evaluator::ExpressionEvaluator> evaluator)
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, column{column} {}
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, setInfo{setInfo} {}

SingleLabelNodeSetExecutor(const SingleLabelNodeSetExecutor& other)
: NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()},
column{other.column} {}
setInfo(other.setInfo) {}

void set() final;
void set(ExecutionContext* context) final;

inline std::unique_ptr<NodeSetExecutor> copy() const final {
return std::make_unique<SingleLabelNodeSetExecutor>(*this);
}

private:
storage::NodeColumn* column;
NodeSetInfo setInfo;
};

class MultiLabelNodeSetExecutor : public NodeSetExecutor {
public:
MultiLabelNodeSetExecutor(
std::unordered_map<common::table_id_t, storage::NodeColumn*> tableIDToColumn,
MultiLabelNodeSetExecutor(std::unordered_map<common::table_id_t, NodeSetInfo> tableIDToSetInfo,
const DataPos& nodeIDPos, const DataPos& lhsVectorPos,
std::unique_ptr<evaluator::ExpressionEvaluator> evaluator)
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, tableIDToColumn{std::move(
tableIDToColumn)} {}
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)},
tableIDToSetInfo{std::move(tableIDToSetInfo)} {}
MultiLabelNodeSetExecutor(const MultiLabelNodeSetExecutor& other)
: NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()},
tableIDToColumn{other.tableIDToColumn} {}
tableIDToSetInfo{other.tableIDToSetInfo} {}

void set() final;
void set(ExecutionContext* context) final;

inline std::unique_ptr<NodeSetExecutor> copy() const final {
return std::make_unique<MultiLabelNodeSetExecutor>(*this);
}

private:
std::unordered_map<common::table_id_t, storage::NodeColumn*> tableIDToColumn;
std::unordered_map<common::table_id_t, NodeSetInfo> tableIDToSetInfo;
};

class RelSetExecutor {
Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@

class NullColumnChunk;

struct ColumnChunkMetadata {
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t numPages = 0;
};

struct OverflowColumnChunkMetadata : public ColumnChunkMetadata {
common::offset_t lastOffsetInPage;
};

// Base data segment covers all fixed-sized data types.
// Some template functions are almost duplicated from `InMemColumnChunk`, which is intended.
// Currently, `InMemColumnChunk` is used to populate rel columns. Eventually, we will merge them.
Expand Down Expand Up @@ -76,6 +85,8 @@
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return numBytes; }

Check warning on line 88 in src/include/storage/copier/column_chunk.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/column_chunk.h#L88

Added line #L88 was not covered by tests
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);

Expand Down
7 changes: 6 additions & 1 deletion src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class StringColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

virtual void update(common::ValueVector* vector, common::vector_idx_t vectorIdx);

template<typename T>
void setValueFromString(const char* value, uint64_t length, uint64_t pos) {
throw common::NotImplementedException("VarSizedColumnChunk::setValueFromString");
Expand All @@ -28,11 +30,14 @@ class StringColumnChunk : public ColumnChunk {

common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

private:
inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); }
inline common::offset_t getLastOffsetInPage() { return overflowCursor.offsetInPage; }

inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + overflowFile->getNumPages();
}

private:
template<typename T>
void templateCopyStringArrowArray(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand Down
39 changes: 39 additions & 0 deletions src/include/storage/local_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "local_table.h"

namespace kuzu {
namespace storage {
class NodesStore;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
LocalStorage(storage::StorageManager* storageManager, storage::MemoryManager* mm);

void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::table_id_t tableID, common::property_id_t propertyID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(common::table_id_t tableID, common::property_id_t propertyID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector);

void prepareCommit();
void prepareRollback();

private:
std::map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
storage::NodesStore* nodesStore;
storage::MemoryManager* mm;
};

} // namespace storage
} // namespace kuzu
124 changes: 124 additions & 0 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#pragma once

#include <bitset>

#include "common/vector/value_vector.h"

namespace kuzu {
namespace storage {
class NodeColumn;
class NodeTable;

class LocalVector {
public:
LocalVector(const common::LogicalType& logicalType, storage::MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(logicalType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
}

virtual ~LocalVector() = default;

void scan(common::ValueVector* resultVector) const;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);

std::unique_ptr<common::ValueVector> vector;
// This mask is mainly to speed the lookup operation up. Otherwise, we have to do binary search
// to check if the value at an offset has been updated or not.
std::bitset<common::DEFAULT_VECTOR_CAPACITY> validityMask;
};

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(storage::MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};

void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;

uint64_t ovfStringLength;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, storage::MemoryManager* mm);
};

class LocalColumnChunk {
public:
explicit LocalColumnChunk(storage::MemoryManager* mm) : mm{mm} {};

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector,
common::ValueVector* resultVector, common::sel_t offsetInResultVector);
void update(common::vector_idx_t vectorIdx, common::sel_t offsetInVector,
common::ValueVector* vectorToWriteFrom, common::sel_t pos);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
storage::MemoryManager* mm;
};

class LocalColumn {
public:
explicit LocalColumn(storage::NodeColumn* column) : column{column} {};
virtual ~LocalColumn() = default;

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
storage::MemoryManager* mm);
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, storage::MemoryManager* mm);

virtual void prepareCommit();

protected:
virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
storage::NodeColumn* column;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(storage::NodeColumn* column);
};

class LocalTable {
public:
explicit LocalTable(storage::NodeTable* table) : table{table} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, storage::MemoryManager* mm);
void update(common::property_id_t propertyID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector,
storage::MemoryManager* mm);

void prepareCommit();

private:
std::map<common::property_id_t, std::unique_ptr<LocalColumn>> columns;
storage::NodeTable* table;
};

} // namespace storage
} // namespace kuzu
26 changes: 22 additions & 4 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,34 @@

class StorageUtils {
public:
static inline common::offset_t getStartOffsetForNodeGroup(
static inline common::offset_t getStartOffsetOfNodeGroup(
common::node_group_idx_t nodeGroupIdx) {
return nodeGroupIdx << common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}

static inline common::node_group_idx_t getNodeGroupIdxFromNodeOffset(
common::offset_t nodeOffset) {
static inline common::offset_t getStartOffsetOfVector(common::vector_idx_t vectorIdx) {
return vectorIdx << common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
static inline common::node_group_idx_t getNodeGroupIdx(common::offset_t nodeOffset) {
return nodeOffset >> common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}

static inline common::vector_idx_t getVectorIdx(common::offset_t offsetInChunk) {
return offsetInChunk >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
static inline common::vector_idx_t getVectorIdxInChunk(
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) {
return (nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx)) >>
common::DEFAULT_VECTOR_CAPACITY_LOG_2;

Check warning on line 106 in src/include/storage/storage_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/storage_utils.h#L105-L106

Added lines #L105 - L106 were not covered by tests
}
static inline std::pair<common::vector_idx_t, common::offset_t>
getVectorIdxInChunkAndOffsetInVector(
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) {
auto startOffsetOfNodeGroup = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto offsetInChunk = nodeOffset - startOffsetOfNodeGroup;
auto vectorIdx = getVectorIdx(offsetInChunk);
return std::make_pair(vectorIdx, offsetInChunk - getStartOffsetOfVector(vectorIdx));
}

static std::string getNodeIndexFName(const std::string& directory,
const common::table_id_t& tableID, common::DBFileType dbFileType);

Expand Down
Loading