Skip to content

Commit

Permalink
basic framework of local storage for node group
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Aug 15, 2023
1 parent b0bbb09 commit e72a2cb
Show file tree
Hide file tree
Showing 45 changed files with 929 additions and 321 deletions.
3 changes: 2 additions & 1 deletion dataset/copy-fault-tests/duplicate-ids/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
create node table person (ID INT64, fName STRING, PRIMARY KEY (ID));
create node table person (ID INT64, fName STRING, PRIMARY KEY (ID));
create node table org (ID STRING, fName STRING, PRIMARY KEY (ID));
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/duplicate-ids/vOrg.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
10,Guodong
24,Semih
31,Xiyang
10,Ziyi
5 changes: 5 additions & 0 deletions src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ std::string ExceptionMessage::invalidPKType(const std::string& type) {
type);
}

std::string ExceptionMessage::overLargeStringValueException(const std::string& length) {
return StringUtils::string_format(
"Maximum length of strings is 4096. Input string's length is {}.", length);
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct ExceptionMessage {
static inline std::string notAllowCopyOnNonEmptyTableException() {
return "COPY commands can only be executed once on a table.";
}
static std::string overLargeStringValueException(const std::string& length);
};

class Exception : public std::exception {
Expand Down
6 changes: 3 additions & 3 deletions src/include/common/types/ku_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace common {

struct ku_string_t {

static const uint64_t PREFIX_LENGTH = 4;
static const uint64_t INLINED_SUFFIX_LENGTH = 8;
static const uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH;
static constexpr uint64_t PREFIX_LENGTH = 4;
static constexpr uint64_t INLINED_SUFFIX_LENGTH = 8;
static constexpr uint64_t SHORT_STR_LENGTH = PREFIX_LENGTH + INLINED_SUFFIX_LENGTH;

uint32_t len;
uint8_t prefix[PREFIX_LENGTH];
Expand Down
1 change: 1 addition & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ValueVector {

inline uint32_t getNumBytesPerValue() const { return numBytesPerValue; }

// TODO(Guodong): Rename this to getValueRef
template<typename T>
inline T& getValue(uint32_t pos) const {
return ((T*)valueBuffer.get())[pos];
Expand Down
4 changes: 2 additions & 2 deletions src/include/main/storage_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class StorageDriver {
uint64_t getNumRels(const std::string& relName);

private:
void scanColumn(
storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result);
void scanColumn(transaction::Transaction* transaction, storage::NodeColumn* column,
common::offset_t* offsets, size_t size, uint8_t* result);

private:
catalog::Catalog* catalog;
Expand Down
30 changes: 17 additions & 13 deletions src/include/processor/operator/persistent/set_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class NodeSetExecutor {

void init(ResultSet* resultSet, ExecutionContext* context);

virtual void set() = 0;
virtual void set(ExecutionContext* context) = 0;

virtual std::unique_ptr<NodeSetExecutor> copy() const = 0;

Expand All @@ -37,46 +37,50 @@ class NodeSetExecutor {
common::ValueVector* rhsVector = nullptr;
};

struct NodeSetInfo {
storage::NodeTable* table;
common::property_id_t propertyID;
};

class SingleLabelNodeSetExecutor : public NodeSetExecutor {
public:
SingleLabelNodeSetExecutor(storage::NodeColumn* column, const DataPos& nodeIDPos,
SingleLabelNodeSetExecutor(NodeSetInfo setInfo, const DataPos& nodeIDPos,
const DataPos& lhsVectorPos, std::unique_ptr<evaluator::ExpressionEvaluator> evaluator)
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, column{column} {}
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, setInfo{setInfo} {}

SingleLabelNodeSetExecutor(const SingleLabelNodeSetExecutor& other)
: NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()},
column{other.column} {}
setInfo(other.setInfo) {}

void set() final;
void set(ExecutionContext* context) final;

inline std::unique_ptr<NodeSetExecutor> copy() const final {
return std::make_unique<SingleLabelNodeSetExecutor>(*this);
}

private:
storage::NodeColumn* column;
NodeSetInfo setInfo;
};

class MultiLabelNodeSetExecutor : public NodeSetExecutor {
public:
MultiLabelNodeSetExecutor(
std::unordered_map<common::table_id_t, storage::NodeColumn*> tableIDToColumn,
MultiLabelNodeSetExecutor(std::unordered_map<common::table_id_t, NodeSetInfo> tableIDToSetInfo,
const DataPos& nodeIDPos, const DataPos& lhsVectorPos,
std::unique_ptr<evaluator::ExpressionEvaluator> evaluator)
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)}, tableIDToColumn{std::move(
tableIDToColumn)} {}
: NodeSetExecutor{nodeIDPos, lhsVectorPos, std::move(evaluator)},
tableIDToSetInfo{std::move(tableIDToSetInfo)} {}
MultiLabelNodeSetExecutor(const MultiLabelNodeSetExecutor& other)
: NodeSetExecutor{other.nodeIDPos, other.lhsVectorPos, other.evaluator->clone()},
tableIDToColumn{other.tableIDToColumn} {}
tableIDToSetInfo{other.tableIDToSetInfo} {}

void set() final;
void set(ExecutionContext* context) final;

inline std::unique_ptr<NodeSetExecutor> copy() const final {
return std::make_unique<MultiLabelNodeSetExecutor>(*this);
}

private:
std::unordered_map<common::table_id_t, storage::NodeColumn*> tableIDToColumn;
std::unordered_map<common::table_id_t, NodeSetInfo> tableIDToSetInfo;
};

class RelSetExecutor {
Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ namespace storage {

class NullColumnChunk;

struct ColumnChunkMetadata {
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t numPages = 0;
};

struct OverflowColumnChunkMetadata : public ColumnChunkMetadata {
common::offset_t lastOffsetInPage;
};

// Base data segment covers all fixed-sized data types.
// Some template functions are almost duplicated from `InMemColumnChunk`, which is intended.
// Currently, `InMemColumnChunk` is used to populate rel columns. Eventually, we will merge them.
Expand Down Expand Up @@ -76,6 +85,8 @@ class ColumnChunk {
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return numBytes; }

Check warning on line 88 in src/include/storage/copier/column_chunk.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/copier/column_chunk.h#L88

Added line #L88 was not covered by tests
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);

Expand Down
7 changes: 6 additions & 1 deletion src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class StringColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

virtual void update(common::ValueVector* vector, common::vector_idx_t vectorIdx);

template<typename T>
void setValueFromString(const char* value, uint64_t length, uint64_t pos) {
throw common::NotImplementedException("VarSizedColumnChunk::setValueFromString");
Expand All @@ -28,11 +30,14 @@ class StringColumnChunk : public ColumnChunk {

common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

private:
inline InMemOverflowFile* getOverflowFile() { return overflowFile.get(); }
inline common::offset_t getLastOffsetInPage() { return overflowCursor.offsetInPage; }

inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + overflowFile->getNumPages();
}

private:
template<typename T>
void templateCopyStringArrowArray(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand Down
39 changes: 39 additions & 0 deletions src/include/storage/local_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "local_table.h"

namespace kuzu {
namespace storage {
class NodesStore;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
LocalStorage(storage::StorageManager* storageManager, storage::MemoryManager* mm);

void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::table_id_t tableID, common::property_id_t propertyID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(common::table_id_t tableID, common::property_id_t propertyID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector);

void prepareCommit();
void prepareRollback();

private:
std::map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
storage::NodesStore* nodesStore;
storage::MemoryManager* mm;
};

} // namespace storage
} // namespace kuzu
123 changes: 123 additions & 0 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#pragma once

#include <bitset>

#include "common/vector/value_vector.h"

namespace kuzu {
namespace storage {
class NodeColumn;
class NodeTable;

class LocalVector {
public:
LocalVector(const common::LogicalType& logicalType, storage::MemoryManager* mm) {
vector = std::make_unique<common::ValueVector>(logicalType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
}

virtual ~LocalVector() = default;

void scan(common::ValueVector* resultVector) const;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);

std::unique_ptr<common::ValueVector> vector;
// This mask is mainly to speed the lookup operation up. Otherwise, we have to do binary search
// to check if the value at an offset has been updated or not.
std::bitset<common::DEFAULT_VECTOR_CAPACITY> validityMask;
};

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(storage::MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};

void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;

uint64_t ovfStringLength;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, storage::MemoryManager* mm);
};

class LocalColumnChunk {
public:
explicit LocalColumnChunk(storage::MemoryManager* mm) : mm{mm} {};

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInVector,
common::ValueVector* resultVector);
void update(common::vector_idx_t vectorIdx, common::sel_t offsetInVector,
common::ValueVector* vectorToWriteFrom, common::sel_t pos);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
storage::MemoryManager* mm;
};

class LocalColumn {
public:
explicit LocalColumn(storage::NodeColumn* column) : column{column} {};
virtual ~LocalColumn() = default;

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
storage::MemoryManager* mm);
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, storage::MemoryManager* mm);

virtual void prepareCommit();

protected:
virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
storage::NodeColumn* column;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(storage::NodeColumn* column);
};

class LocalTable {
public:
explicit LocalTable(storage::NodeTable* table) : table{table} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::property_id_t propertyID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, storage::MemoryManager* mm);
void update(common::property_id_t propertyID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector,
storage::MemoryManager* mm);

void prepareCommit();

private:
std::map<common::property_id_t, std::unique_ptr<LocalColumn>> columns;
storage::NodeTable* table;
};

} // namespace storage
} // namespace kuzu
26 changes: 22 additions & 4 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,34 @@ struct PageUtils {

class StorageUtils {
public:
static inline common::offset_t getStartOffsetForNodeGroup(
static inline common::offset_t getStartOffsetOfNodeGroup(
common::node_group_idx_t nodeGroupIdx) {
return nodeGroupIdx << common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}

static inline common::node_group_idx_t getNodeGroupIdxFromNodeOffset(
common::offset_t nodeOffset) {
static inline common::offset_t getStartOffsetOfVector(common::vector_idx_t vectorIdx) {
return vectorIdx << common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
static inline common::node_group_idx_t getNodeGroupIdx(common::offset_t nodeOffset) {
return nodeOffset >> common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}

static inline common::vector_idx_t getVectorIdx(common::offset_t offsetInChunk) {
return offsetInChunk >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
static inline common::vector_idx_t getVectorIdxInChunk(
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) {
return (nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx)) >>
common::DEFAULT_VECTOR_CAPACITY_LOG_2;

Check warning on line 106 in src/include/storage/storage_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/storage_utils.h#L105-L106

Added lines #L105 - L106 were not covered by tests
}
static inline std::pair<common::vector_idx_t, common::offset_t>
getVectorIdxInChunkAndOffsetInVector(
common::offset_t nodeOffset, common::node_group_idx_t nodeGroupIdx) {
auto startOffsetOfNodeGroup = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto offsetInChunk = nodeOffset - startOffsetOfNodeGroup;
auto vectorIdx = getVectorIdx(offsetInChunk);
return std::make_pair(vectorIdx, offsetInChunk - getStartOffsetOfVector(vectorIdx));
}

static std::string getNodeIndexFName(const std::string& directory,
const common::table_id_t& tableID, common::DBFileType dbFileType);

Expand Down
Loading

0 comments on commit e72a2cb

Please sign in to comment.