Skip to content

Commit

Permalink
Merge pull request #2394 from kuzudb/rel-updates
Browse files Browse the repository at this point in the history
Rework local storage for node table and fix #2376
  • Loading branch information
ray6080 committed Nov 14, 2023
2 parents 684511c + 5b73d7b commit ba5b927
Show file tree
Hide file tree
Showing 57 changed files with 845 additions and 770 deletions.
1 change: 1 addition & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void ListAuxiliaryBuffer::resize(uint64_t numValues) {
bool needResizeDataVector = numValues > capacity;
while (numValues > capacity) {
capacity *= 2;
KU_ASSERT(capacity != 0);
}
if (needResizeDataVector) {
resizeDataVector(dataVector.get());
Expand Down
2 changes: 2 additions & 0 deletions src/include/planner/operator/persistent/logical_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class LogicalSetNodeProperty : public LogicalOperator {
return infos;
}

f_group_pos_set getGroupsPosToFlatten(uint32_t idx);

std::string getExpressionsForPrinting() const final;

inline std::unique_ptr<LogicalOperator> copy() final {
Expand Down
34 changes: 23 additions & 11 deletions src/include/storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,48 @@

namespace kuzu {
namespace storage {
class StorageManager;

class Column;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
LocalStorage(storage::StorageManager* storageManager, storage::MemoryManager* mm);
LocalStorage(storage::MemoryManager* mm);

void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::table_id_t tableID, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(common::table_id_t tableID, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector);

void prepareCommit();
void prepareRollback();
// Note: `initializeLocalTable` should be called before `insert` and `update`.
void initializeLocalTable(
common::table_id_t tableID, const std::vector<std::unique_ptr<Column>>& columns);
void insert(common::table_id_t tableID, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::table_id_t tableID, common::ValueVector* nodeIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
void delete_(common::table_id_t tableID, common::ValueVector* nodeIDVector);

inline std::unordered_set<common::table_id_t> getTableIDsWithUpdates() {
std::unordered_set<common::table_id_t> tableSetToUpdate;
for (auto& [tableID, _] : tables) {
tableSetToUpdate.insert(tableID);
}
return tableSetToUpdate;
}
inline LocalTable* getLocalTable(common::table_id_t tableID) {
KU_ASSERT(tables.contains(tableID));
return tables.at(tableID).get();
}

private:
std::map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
StorageManager* storageManager;
storage::MemoryManager* mm;
bool enableCompression;
};

} // namespace storage
Expand Down
221 changes: 100 additions & 121 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
@@ -1,169 +1,148 @@
#pragma once

#include <bitset>
#include <map>

#include "common/vector/value_vector.h"

namespace kuzu {
namespace storage {
class Column;
class NodeTable;
class TableData;

// TODO(Guodong): Instead of using ValueVector, we should switch to ColumnChunk.
// This class is used to store a chunk of local changes to a column in a node group.
// Values are stored inside `vector`.
class LocalVector {
public:
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) {
LocalVector(const common::LogicalType& dataType, MemoryManager* mm) : numValues{0} {
vector = std::make_unique<common::ValueVector>(dataType, mm);
vector->setState(std::make_shared<common::DataChunkState>());
vector->state->selVector->resetSelectorToValuePosBuffer();
vector->state = std::make_unique<common::DataChunkState>();
vector->state->selVector->resetSelectorToValuePosBufferWithSize(1);
}

virtual ~LocalVector() = default;

virtual void scan(common::ValueVector* resultVector) const;
virtual void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
void read(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector);
virtual void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector);

std::unique_ptr<common::ValueVector> vector;
// This mask is mainly to speed the lookup operation up. Otherwise, we have to do binary search
// to check if the value at an offset has been updated or not.
std::bitset<common::DEFAULT_VECTOR_CAPACITY> validityMask;
};

class StringLocalVector : public LocalVector {
public:
explicit StringLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType(common::LogicalTypeID::STRING), mm}, ovfStringLength{
0} {};
void append(common::ValueVector* valueVector);

void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
inline common::ValueVector* getVector() { return vector.get(); }
inline bool isFull() { return numValues == common::DEFAULT_VECTOR_CAPACITY; }

uint64_t ovfStringLength;
private:
std::unique_ptr<common::ValueVector> vector;
common::sel_t numValues;
};

class StructLocalVector : public LocalVector {
// This class is used to store local changes of a column in a node group.
// It consists of a collection of LocalVector, each of which is a chunk of the local changes.
// By default, the size of each vector (chunk) is DEFAULT_VECTOR_CAPACITY, and the collection
// contains 64 vectors (chunks).
class LocalVectorCollection {
public:
explicit StructLocalVector(MemoryManager* mm)
: LocalVector{common::LogicalType{common::LogicalTypeID::STRUCT,
std::make_unique<common::StructTypeInfo>()},
mm} {}

void scan(common::ValueVector* resultVector) const final;
void lookup(common::sel_t offsetInLocalVector, common::ValueVector* resultVector,
common::sel_t offsetInResultVector) final;
void update(common::sel_t offsetInLocalVector, common::ValueVector* updateVector,
common::sel_t offsetInUpdateVector) final;
};

struct LocalVectorFactory {
static std::unique_ptr<LocalVector> createLocalVectorData(
const common::LogicalType& logicalType, MemoryManager* mm);
};
LocalVectorCollection(const common::LogicalType* dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm}, numRows{0} {}

void read(common::row_idx_t rowIdx, common::ValueVector* outputVector,
common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector, common::ValueVector* propertyVectors);
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
inline void delete_(common::offset_t nodeOffset) {
insertInfo.erase(nodeOffset);
updateInfo.erase(nodeOffset);
}
inline std::map<common::offset_t, common::row_idx_t>& getInsertInfoRef() { return insertInfo; }
inline std::map<common::offset_t, common::row_idx_t>& getUpdateInfoRef() { return updateInfo; }
inline uint64_t getNumRows() { return numRows; }
inline LocalVector* getLocalVector(common::row_idx_t rowIdx) {
auto vectorIdx = rowIdx >> common::DEFAULT_VECTOR_CAPACITY_LOG_2;
KU_ASSERT(vectorIdx < vectors.size());
return vectors[vectorIdx].get();
}

class LocalColumnChunk {
public:
explicit LocalColumnChunk(const common::LogicalType& dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm} {};
common::row_idx_t getRowIdx(common::offset_t nodeOffset);

void scan(common::vector_idx_t vectorIdx, common::ValueVector* resultVector);
void lookup(common::vector_idx_t vectorIdx, common::sel_t offsetInLocalVector,
common::ValueVector* resultVector, common::sel_t offsetInResultVector);
void update(common::vector_idx_t vectorIdx, common::sel_t offsetInVector,
common::ValueVector* vectorToWriteFrom, common::sel_t pos);
private:
void prepareAppend();
void append(common::ValueVector* vector);

std::map<common::vector_idx_t, std::unique_ptr<LocalVector>> vectors;
common::LogicalType dataType;
private:
const common::LogicalType* dataType;
MemoryManager* mm;
std::vector<std::unique_ptr<LocalVector>> vectors;
common::row_idx_t numRows;
// TODO: Do we need to differentiate between insert and update?
// New nodes to be inserted into the persistent storage.
std::map<common::offset_t, common::row_idx_t> insertInfo;
// Nodes in the persistent storage to be updated.
std::map<common::offset_t, common::row_idx_t> updateInfo;
};

class LocalColumn {
public:
explicit LocalColumn(Column* column, bool enableCompression)
: column{column}, enableCompression{enableCompression} {};
virtual ~LocalColumn() = default;

virtual void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void update(
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector, MemoryManager* mm);
virtual void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm);

virtual void prepareCommit();

virtual void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx);
void commitLocalChunkOutOfPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
void commitLocalChunkInPlace(
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);

protected:
std::map<common::node_group_idx_t, std::unique_ptr<LocalColumnChunk>> chunks;
Column* column;
bool enableCompression;
};

class StringLocalColumn : public LocalColumn {
public:
explicit StringLocalColumn(Column* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(Column* column, bool enableCompression)
: LocalColumn{column, enableCompression} {};
class LocalNodeGroup {
friend class NodeTableData;

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

class StructLocalColumn : public LocalColumn {
public:
explicit StructLocalColumn(Column* column, bool enableCompression);

void scan(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void lookup(common::ValueVector* nodeIDVector, common::ValueVector* resultVector) final;
void update(common::ValueVector* nodeIDVector, common::ValueVector* propertyVector,
MemoryManager* mm) final;
void update(common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector, MemoryManager* mm) final;
LocalNodeGroup(
const std::vector<std::unique_ptr<common::LogicalType>>& dataTypes, MemoryManager* mm) {
columns.resize(dataTypes.size());
for (auto i = 0u; i < dataTypes.size(); ++i) {
// To avoid unnecessary memory consumption, we chunk local changes of each column in the
// node group into chunks of size DEFAULT_VECTOR_CAPACITY.
columns[i] = std::make_unique<LocalVectorCollection>(dataTypes[i].get(), mm);
}
}

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::offset_t nodeOffset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

inline LocalVectorCollection* getLocalColumnChunk(common::column_id_t columnID) {
return columns[columnID].get();
}

private:
std::vector<std::unique_ptr<LocalColumn>> fields;
};
inline common::row_idx_t getRowIdx(common::column_id_t columnID, common::offset_t nodeOffset) {
KU_ASSERT(columnID < columns.size());
return columns[columnID]->getRowIdx(nodeOffset);
}

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(Column* column, bool enableCompression);
private:
std::vector<std::unique_ptr<LocalVectorCollection>> columns;
};

class LocalTable {
friend class NodeTableData;

public:
explicit LocalTable(NodeTable* table, bool enableCompression)
: table{table}, enableCompression{enableCompression} {};
explicit LocalTable(common::table_id_t tableID,
std::vector<std::unique_ptr<common::LogicalType>> dataTypes, MemoryManager* mm)
: tableID{tableID}, dataTypes{std::move(dataTypes)}, mm{mm} {};

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void update(common::column_id_t columnID, common::ValueVector* nodeIDVector,
common::ValueVector* propertyVector, MemoryManager* mm);
void update(common::column_id_t columnID, common::offset_t nodeOffset,
common::ValueVector* propertyVector, common::sel_t posInPropertyVector, MemoryManager* mm);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

inline void clear() { nodeGroups.clear(); }

void prepareCommit();
private:
common::node_group_idx_t initializeLocalNodeGroup(common::ValueVector* nodeIDVector);
common::node_group_idx_t initializeLocalNodeGroup(common::offset_t nodeOffset);

private:
std::map<common::column_id_t, std::unique_ptr<LocalColumn>> columns;
NodeTable* table;
bool enableCompression;
common::table_id_t tableID;
std::vector<std::unique_ptr<common::LogicalType>> dataTypes;
MemoryManager* mm;
std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};

} // namespace storage
Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ class StorageManager {
StorageManager(bool readOnly, const catalog::Catalog& catalog, MemoryManager& memoryManager,
WAL* wal, bool enableCompression);

void createTable(
common::table_id_t tableID, BufferManager* bufferManager, catalog::Catalog* catalog);
void createTable(common::table_id_t tableID, catalog::Catalog* catalog);
void dropTable(common::table_id_t tableID);

void prepareCommit();
void prepareRollback();
void prepareCommit(transaction::Transaction* transaction);
void prepareRollback(transaction::Transaction* transaction);
void checkpointInMemory();
void rollbackInMemory();

Expand Down Expand Up @@ -49,7 +48,7 @@ class StorageManager {
inline bool compressionEnabled() const { return enableCompression; }

private:
void loadTables(bool readOnly, const catalog::Catalog& catalog, BufferManager* bufferManager);
void loadTables(bool readOnly, const catalog::Catalog& catalog);

private:
std::unique_ptr<BMFileHandle> dataFH;
Expand Down
14 changes: 14 additions & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class Column {
return metadataDA->getNumElements(transaction->getType());
}

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, bool isNewNodeGroup);
virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down Expand Up @@ -102,6 +104,18 @@ class Column {
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);

private:
static bool containsVarList(common::LogicalType& dataType);
bool canCommitInPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
void commitLocalChunkInPlace(LocalVectorCollection* localChunk);
void commitLocalChunkOutOfPlace(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup);

void applyLocalChunkToColumnChunk(LocalVectorCollection* localChunk, ColumnChunk* columnChunk,
common::offset_t nodeGroupStartOffset,
const std::map<common::offset_t, common::row_idx_t>& updateInfo);
void applyLocalChunkToColumn(LocalVectorCollection* localChunk,
const std::map<common::offset_t, common::row_idx_t>& updateInfo);

// check if val is in range [start, end)
static inline bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
return val >= start && val < end;
Expand Down
Loading

0 comments on commit ba5b927

Please sign in to comment.