Skip to content

Commit

Permalink
Merge pull request #2072 from kuzudb/refactor-storage-1
Browse files Browse the repository at this point in the history
Add abstraction of TableData
  • Loading branch information
ray6080 committed Sep 23, 2023
2 parents bb02646 + 177d8f9 commit b2ec22c
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 168 deletions.
5 changes: 2 additions & 3 deletions src/include/processor/operator/persistent/delete_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SingleLabelNodeDeleteExecutor : public NodeDeleteExecutor {

private:
storage::NodeTable* table;
std::unique_ptr<storage::NodeTable::DeleteState> deleteState;
std::unique_ptr<common::ValueVector> pkVector;
};

class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {
Expand All @@ -60,8 +60,7 @@ class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {

private:
std::unordered_map<common::table_id_t, storage::NodeTable*> tableIDToTableMap;
std::unordered_map<common::table_id_t, std::unique_ptr<storage::NodeTable::DeleteState>>
deleteStates;
std::unordered_map<common::table_id_t, std::unique_ptr<common::ValueVector>> pkVectors;
};

class RelDeleteExecutor {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ namespace kuzu {
namespace storage {

class NodeTable;
class TableData;

class NodeGroup {
public:
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig);
explicit NodeGroup(NodeTable* table);
explicit NodeGroup(TableData* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
inline uint64_t getNodeGroupIdx() const { return nodeGroupIdx; }
Expand Down
59 changes: 22 additions & 37 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
#include "catalog/catalog.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_statistics_and_deleted_ids.h"
#include "storage/storage_structure/lists/lists.h"
#include "storage/store/node_column.h"
#include "storage/store/node_group.h"
#include "storage/store/table_data.h"
#include "storage/wal/wal.h"

namespace kuzu {
Expand All @@ -17,10 +16,6 @@ namespace storage {

class NodeTable {
public:
struct DeleteState {
std::unique_ptr<common::ValueVector> pkVector;
};

NodeTable(BMFileHandle* dataFH, BMFileHandle* metadataFH,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, BufferManager& bufferManager,
WAL* wal, catalog::NodeTableSchema* nodeTableSchema);
Expand All @@ -30,32 +25,37 @@ class NodeTable {
inline common::offset_t getMaxNodeOffset(transaction::Transaction* transaction) const {
return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(transaction, tableID);
}

inline void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, std::shared_ptr<common::ValueVector>& vector) const {
assert(vector->isSequential());
nodesStatisticsAndDeletedIDs->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
}
inline BMFileHandle* getDataFH() const { return dataFH; }

void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
inline BMFileHandle* getDataFH() const { return tableData->getDataFH(); }

inline void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
const std::vector<common::ValueVector*>& outputVectors) {
tableData->read(transaction, nodeIDVector, columnIDs, outputVectors);
}
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
inline void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector) {
tableData->update(transaction, columnID, nodeIDVector, propertyVector);
}
inline void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector) const;
common::sel_t posInPropertyVector) const {
tableData->update(transaction, columnID, nodeOffset, propertyVector, posInPropertyVector);
}
void delete_(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
DeleteState* deleteState);
void append(NodeGroup* nodeGroup);
common::ValueVector* pkVector);
inline void append(NodeGroup* nodeGroup) { tableData->append(nodeGroup); }

inline common::column_id_t getNumColumns() const { return columns.size(); }
inline common::column_id_t getNumColumns() const { return tableData->getNumColumns(); }
inline NodeColumn* getColumn(common::column_id_t columnID) {
assert(columnID < columns.size());
return columns[columnID].get();
return tableData->getColumn(columnID);
}
inline common::column_id_t getPKColumnID() const { return pkColumnID; }
inline PrimaryKeyIndex* getPKIndex() const { return pkIndex.get(); }
Expand All @@ -64,9 +64,7 @@ class NodeTable {
}
inline common::table_id_t getTableID() const { return tableID; }

inline void dropColumn(common::column_id_t columnID) {
columns.erase(columns.begin() + columnID);
}
inline void dropColumn(common::column_id_t columnID) { tableData->dropColumn(columnID); }
void addColumn(const catalog::Property& property, common::ValueVector* defaultValueVector,
transaction::Transaction* transaction);

Expand All @@ -76,27 +74,14 @@ class NodeTable {
void rollbackInMemory();

private:
void initializeData(catalog::NodeTableSchema* nodeTableSchema);
void initializeColumns(catalog::NodeTableSchema* nodeTableSchema);

void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);

void insertPK(common::ValueVector* nodeIDVector, common::ValueVector* primaryKeyVector);
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
assert(!columns.empty());
return columns[0]->getNumNodeGroups(transaction);
return tableData->getNumNodeGroups(transaction);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
std::vector<std::unique_ptr<NodeColumn>> columns;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
std::unique_ptr<TableData> tableData;
common::column_id_t pkColumnID;
std::unique_ptr<PrimaryKeyIndex> pkIndex;
common::table_id_t tableID;
Expand Down
4 changes: 0 additions & 4 deletions src/include/storage/store/nodes_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ class NodesStore {
NodesStore(BMFileHandle* dataFH, BMFileHandle* metadataFH, const catalog::Catalog& catalog,
BufferManager& bufferManager, WAL* wal);

inline NodeColumn* getNodePropertyColumn(
common::table_id_t tableID, uint64_t propertyIdx) const {
return nodeTables.at(tableID)->getColumn(propertyIdx);
}
inline PrimaryKeyIndex* getPKIndex(common::table_id_t tableID) {
return nodeTables[tableID]->getPKIndex();
}
Expand Down
67 changes: 67 additions & 0 deletions src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "storage/store/node_column.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace storage {

class NodesStatisticsAndDeletedIDs;

class TableData {
public:
TableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, common::table_id_t tableID,
BufferManager* bufferManager, WAL* wal, const std::vector<catalog::Property*>& properties,
TablesStatistics* tablesStatistics);

void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector) const;
void append(NodeGroup* nodeGroup);

inline void dropColumn(common::column_id_t columnID) {
columns.erase(columns.begin() + columnID);
}
void addColumn(transaction::Transaction* transaction, const catalog::Property& property,
common::ValueVector* defaultValueVector, TablesStatistics* tableStats);

inline common::vector_idx_t getNumColumns() const { return columns.size(); }
inline NodeColumn* getColumn(common::column_id_t columnID) {
assert(columnID < columns.size());
return columns[columnID].get();
}
inline common::node_group_idx_t getNumNodeGroups(transaction::Transaction* transaction) const {
assert(!columns.empty());
return columns[0]->getNumNodeGroups(transaction);
}
inline BMFileHandle* getDataFH() const { return dataFH; }

void checkpointInMemory();
void rollbackInMemory();

private:
void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);

private:
std::vector<std::unique_ptr<NodeColumn>> columns;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
common::table_id_t tableID;
BufferManager* bufferManager;
WAL* wal;
};

} // namespace storage
} // namespace kuzu
17 changes: 7 additions & 10 deletions src/processor/operator/persistent/delete_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,32 @@ void NodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {

void SingleLabelNodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
NodeDeleteExecutor::init(resultSet, context);
deleteState = std::make_unique<NodeTable::DeleteState>();
auto pkDataType = table->getColumn(table->getPKColumnID())->getDataType();
deleteState->pkVector = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
deleteState->pkVector->state = nodeIDVector->state;
pkVector = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
pkVector->state = nodeIDVector->state;
}

void SingleLabelNodeDeleteExecutor::delete_(ExecutionContext* context) {
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector, deleteState.get());
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector, pkVector.get());
}

void MultiLabelNodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
NodeDeleteExecutor::init(resultSet, context);
for (auto& [tableID, table] : tableIDToTableMap) {
deleteStates[tableID] = std::make_unique<NodeTable::DeleteState>();
auto pkDataType = table->getColumn(table->getPKColumnID())->getDataType();
deleteStates[tableID]->pkVector =
std::make_unique<ValueVector>(pkDataType, context->memoryManager);
deleteStates[tableID]->pkVector->state = nodeIDVector->state;
pkVectors[tableID] = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
pkVectors[tableID]->state = nodeIDVector->state;
}
}

void MultiLabelNodeDeleteExecutor::delete_(ExecutionContext* context) {
assert(nodeIDVector->state->selVector->selectedSize == 1);
auto pos = nodeIDVector->state->selVector->selectedPositions[0];
auto nodeID = nodeIDVector->getValue<internalID_t>(pos);
assert(tableIDToTableMap.contains(nodeID.tableID) && deleteStates.contains(nodeID.tableID));
assert(tableIDToTableMap.contains(nodeID.tableID) && pkVectors.contains(nodeID.tableID));
auto table = tableIDToTableMap.at(nodeID.tableID);
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector,
deleteStates.at(nodeID.tableID).get());
pkVectors.at(nodeID.tableID).get());
}

void RelDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_library(kuzu_storage_store
struct_column_chunk.cpp
struct_node_column.cpp
table_copy_utils.cpp
table_data.cpp
var_list_column_chunk.cpp
var_list_node_column.cpp)

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NodeGroup::NodeGroup(TableSchema* schema, CSVReaderConfig* csvReaderConfig)
}
}

NodeGroup::NodeGroup(NodeTable* table) : nodeGroupIdx{UINT64_MAX}, numNodes{0} {
NodeGroup::NodeGroup(TableData* table) : nodeGroupIdx{UINT64_MAX}, numNodes{0} {
chunks.reserve(table->getNumColumns());
for (auto columnID = 0u; columnID < table->getNumColumns(); columnID++) {
chunks.push_back(
Expand Down
Loading

0 comments on commit b2ec22c

Please sign in to comment.