Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework local storage for node table and fix #2376 #2394

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void ListAuxiliaryBuffer::resize(uint64_t numValues) {
bool needResizeDataVector = numValues > capacity;
while (numValues > capacity) {
capacity *= 2;
KU_ASSERT(capacity != 0);
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
Expand Down
2 changes: 2 additions & 0 deletions src/include/planner/operator/persistent/logical_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class LogicalSetNodeProperty : public LogicalOperator {
return infos;
}

f_group_pos_set getGroupsPosToFlatten(uint32_t idx);

std::string getExpressionsForPrinting() const final;

inline std::unique_ptr<LogicalOperator> copy() final {
Expand Down
34 changes: 23 additions & 11 deletions src/include/storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,48 @@

namespace kuzu {
namespace storage {
class StorageManager;

class Column;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
LocalStorage(storage::StorageManager* storageManager, storage::MemoryManager* mm);
LocalStorage(storage::MemoryManager* mm);

void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::table_id_t tableID, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(common::table_id_t tableID, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector);

void prepareCommit();
void prepareRollback();
// Note: `initializeLocalTable` should be called before `insert` and `update`.
void initializeLocalTable(
common::table_id_t tableID, const std::vector<std::unique_ptr<Column>>& columns);
void insert(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::table_id_t tableID, common::ValueVector* nodeIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
void delete_(common::table_id_t tableID, common::ValueVector* nodeIDVector);

inline std::unordered_set<common::table_id_t> getTableIDsWithUpdates() {
std::unordered_set<common::table_id_t> tableSetToUpdate;
for (auto& [tableID, _] : tables) {
tableSetToUpdate.insert(tableID);
}
return tableSetToUpdate;
}
inline LocalTable* getLocalTable(common::table_id_t tableID) {
KU_ASSERT(tables.contains(tableID));
return tables.at(tableID).get();
}

private:
std::map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
StorageManager* storageManager;
storage::MemoryManager* mm;
bool enableCompression;
};

} // namespace storage
Expand Down
221 changes: 100 additions & 121 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
@@ -1,169 +1,148 @@
#pragma once

#include <bitset>
#include <map>

#include "common/vector/value_vector.h"

namespace kuzu {
namespace storage {
class Column;
class NodeTable;
class TableData;

// TODO(Guodong): Instead of using ValueVector, we should switch to ColumnChunk.
// This class is used to store a chunk of local changes to a column in a node group.
// Values are stored inside `vector`.
class LocalVector {
public:
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) {
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) : numValues{0} {
vector = std::make_unique<common::ValueVector>(dataType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
vector->state = std::make_unique<common::DataChunkState>();
vector->state->selVector->resetSelectorToValuePosBufferWithSize(1);
}

virtual ~LocalVector() = default;

virtual void scan(common::ValueVector* resultVector) const;
virtual void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
void read(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);

std::unique_ptr<common::ValueVector> vector;
// This mask is mainly to speed the lookup operation up. Otherwise, we have to do binary search
// to check if the value at an offset has been updated or not.
std::bitset<common::DEFAULT_VECTOR_CAPACITY> validityMask;
};

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};
void append(common::ValueVector* valueVector);

void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
inline common::ValueVector* getVector() { return vector.get(); }
inline bool isFull() { return numValues == common::DEFAULT_VECTOR_CAPACITY; }

uint64_t ovfStringLength;
private:
std::unique_ptr<common::ValueVector> vector;
common::sel_t numValues;
};

class StructLocalVector : public LocalVector {
// This class is used to store local changes of a column in a node group.
// It consists of a collection of LocalVector, each of which is a chunk of the local changes.
// By default, the size of each vector (chunk) is DEFAULT_VECTOR_CAPACITY, and the collection
// contains 64 vectors (chunks).
class LocalVectorCollection {
public:
explicit StructLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType{common::LogicalTypeID::STRUCT,
std::make_unique<common::StructTypeInfo>()},
mm} {}

void scan(common::ValueVector* resultVector) const final;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector) final;
void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, MemoryManager* mm);
};
LocalVectorCollection(const common::LogicalType* dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm}, numRows{0} {}

void read(common::row_idx_t rowIdx, common::ValueVector* outputVector,
common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector, common::ValueVector* propertyVectors);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
inline void delete_(common::offset_t nodeOffset) {
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
insertInfo.erase(nodeOffset);
updateInfo.erase(nodeOffset);
}
inline std::map<common::offset_t, common::row_idx_t>& getInsertInfoRef() { return insertInfo; }
inline std::map<common::offset_t, common::row_idx_t>& getUpdateInfoRef() { return updateInfo; }
inline uint64_t getNumRows() { return numRows; }
inline LocalVector* getLocalVector(common::row_idx_t rowIdx) {
auto vectorIdx = rowIdx >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
KU_ASSERT(vectorIdx < vectors.size());
return vectors[vectorIdx].get();
}

class LocalColumnChunk {
public:
explicit LocalColumnChunk(const common::LogicalType& dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm} {};
common::row_idx_t getRowIdx(common::offset_t nodeOffset);

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector,
common::ValueVector* resultVector, common::sel_t offsetInResultVector);
void update(common::vector_idx_t vectorIdx, common::sel_t offsetInVector,
common::ValueVector* vectorToWriteFrom, common::sel_t pos);
private:
void prepareAppend();
void append(common::ValueVector* vector);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
common::LogicalType dataType;
private:
const common::LogicalType* dataType;
MemoryManager* mm;
std::vector<std::unique_ptr<LocalVector>> vectors;
common::row_idx_t numRows;
// TODO: Do we need to differentiate between insert and update?
// New nodes to be inserted into the persistent storage.
std::map<common::offset_t, common::row_idx_t> insertInfo;
// Nodes in the persistent storage to be updated.
std::map<common::offset_t, common::row_idx_t> updateInfo;
};

class LocalColumn {
public:
explicit LocalColumn(Column* column, bool enableCompression)
: column{column}, enableCompression{enableCompression} {};
virtual ~LocalColumn() = default;

virtual void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void update(
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector, MemoryManager* mm);
virtual void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm);

virtual void prepareCommit();

virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
void commitLocalChunkInPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
Column* column;
bool enableCompression;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(Column* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(Column* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};
class LocalNodeGroup {
friend class NodeTableData;

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class StructLocalColumn : public LocalColumn {
public:
explicit StructLocalColumn(Column* column, bool enableCompression);

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
MemoryManager* mm) final;
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm) final;
LocalNodeGroup(
const std::vector<std::unique_ptr<common::LogicalType>>& dataTypes, MemoryManager* mm) {
columns.resize(dataTypes.size());
for (auto i = 0u; i < dataTypes.size(); ++i) {
// To avoid unnecessary memory consumption, we chunk local changes of each column in the
// node group into chunks of size DEFAULT_VECTOR_CAPACITY.
columns[i] = std::make_unique<LocalVectorCollection>(dataTypes[i].get(), mm);
}
}

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::offset_t nodeOffset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

inline LocalVectorCollection* getLocalColumnChunk(common::column_id_t columnID) {
return columns[columnID].get();
}

private:
std::vector<std::unique_ptr<LocalColumn>> fields;
};
inline common::row_idx_t getRowIdx(common::column_id_t columnID, common::offset_t nodeOffset) {
KU_ASSERT(columnID < columns.size());
return columns[columnID]->getRowIdx(nodeOffset);
}

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(Column* column, bool enableCompression);
private:
std::vector<std::unique_ptr<LocalVectorCollection>> columns;
};

class LocalTable {
friend class NodeTableData;

public:
explicit LocalTable(NodeTable* table, bool enableCompression)
: table{table}, enableCompression{enableCompression} {};
explicit LocalTable(common::table_id_t tableID,
std::vector<std::unique_ptr<common::LogicalType>> dataTypes, MemoryManager* mm)
: tableID{tableID}, dataTypes{std::move(dataTypes)}, mm{mm} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::column_id_t columnID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, MemoryManager* mm);
void update(common::column_id_t columnID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector, MemoryManager* mm);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

inline void clear() { nodeGroups.clear(); }

void prepareCommit();
private:
common::node_group_idx_t initializeLocalNodeGroup(common::ValueVector* nodeIDVector);
common::node_group_idx_t initializeLocalNodeGroup(common::offset_t nodeOffset);

private:
std::map<common::column_id_t, std::unique_ptr<LocalColumn>> columns;
NodeTable* table;
bool enableCompression;
common::table_id_t tableID;
std::vector<std::unique_ptr<common::LogicalType>> dataTypes;
MemoryManager* mm;
std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};

} // namespace storage
Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ class StorageManager {
StorageManager(bool readOnly, const catalog::Catalog& catalog, MemoryManager& memoryManager,
WAL* wal, bool enableCompression);

void createTable(
common::table_id_t tableID, BufferManager* bufferManager, catalog::Catalog* catalog);
void createTable(common::table_id_t tableID, catalog::Catalog* catalog);
void dropTable(common::table_id_t tableID);

void prepareCommit();
void prepareRollback();
void prepareCommit(transaction::Transaction* transaction);
void prepareRollback(transaction::Transaction* transaction);
void checkpointInMemory();
void rollbackInMemory();

Expand Down Expand Up @@ -49,7 +48,7 @@ class StorageManager {
inline bool compressionEnabled() const { return enableCompression; }

private:
void loadTables(bool readOnly, const catalog::Catalog& catalog, BufferManager* bufferManager);
void loadTables(bool readOnly, const catalog::Catalog& catalog);

private:
std::unique_ptr<BMFileHandle> dataFH;
Expand Down
14 changes: 14 additions & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class Column {
return metadataDA->getNumElements(transaction->getType());
}

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, bool isNewNodeGroup);
virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down Expand Up @@ -102,6 +104,18 @@ class Column {
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);

private:
static bool containsVarList(common::LogicalType& dataType);
bool canCommitInPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
void commitLocalChunkInPlace(LocalVectorCollection* localChunk);
void commitLocalChunkOutOfPlace(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup);

void applyLocalChunkToColumnChunk(LocalVectorCollection* localChunk, ColumnChunk* columnChunk,
common::offset_t nodeGroupStartOffset,
const std::map<common::offset_t, common::row_idx_t>& updateInfo);
void applyLocalChunkToColumn(LocalVectorCollection* localChunk,
const std::map<common::offset_t, common::row_idx_t>& updateInfo);

// check if val is in range [start, end)
static inline bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
return val >= start && val < end;
Expand Down
Loading