Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add abstraction of TableData #2072

Merged
merged 1 commit into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/include/processor/operator/persistent/delete_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SingleLabelNodeDeleteExecutor : public NodeDeleteExecutor {

private:
storage::NodeTable* table;
std::unique_ptr<storage::NodeTable::DeleteState> deleteState;
std::unique_ptr<common::ValueVector> pkVector;
};

class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {
Expand All @@ -60,8 +60,7 @@ class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {

private:
std::unordered_map<common::table_id_t, storage::NodeTable*> tableIDToTableMap;
std::unordered_map<common::table_id_t, std::unique_ptr<storage::NodeTable::DeleteState>>
deleteStates;
std::unordered_map<common::table_id_t, std::unique_ptr<common::ValueVector>> pkVectors;
};

class RelDeleteExecutor {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ namespace kuzu {
namespace storage {

class NodeTable;
class TableData;

class NodeGroup {
public:
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig);
explicit NodeGroup(NodeTable* table);
explicit NodeGroup(TableData* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
inline uint64_t getNodeGroupIdx() const { return nodeGroupIdx; }
Expand Down
59 changes: 22 additions & 37 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
#include "catalog/catalog.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_statistics_and_deleted_ids.h"
#include "storage/storage_structure/lists/lists.h"
#include "storage/store/node_column.h"
#include "storage/store/node_group.h"
#include "storage/store/table_data.h"
#include "storage/wal/wal.h"

namespace kuzu {
Expand All @@ -17,10 +16,6 @@ namespace storage {

class NodeTable {
public:
struct DeleteState {
std::unique_ptr<common::ValueVector> pkVector;
};

NodeTable(BMFileHandle* dataFH, BMFileHandle* metadataFH,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, BufferManager& bufferManager,
WAL* wal, catalog::NodeTableSchema* nodeTableSchema);
Expand All @@ -30,32 +25,37 @@ class NodeTable {
inline common::offset_t getMaxNodeOffset(transaction::Transaction* transaction) const {
return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(transaction, tableID);
}

inline void setSelVectorForDeletedOffsets(
transaction::Transaction* trx, std::shared_ptr<common::ValueVector>& vector) const {
assert(vector->isSequential());
nodesStatisticsAndDeletedIDs->setDeletedNodeOffsetsForMorsel(trx, vector, tableID);
}
inline BMFileHandle* getDataFH() const { return dataFH; }

void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
inline BMFileHandle* getDataFH() const { return tableData->getDataFH(); }

inline void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
const std::vector<common::ValueVector*>& outputVectors) {
tableData->read(transaction, nodeIDVector, columnIDs, outputVectors);
}
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
inline void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector) {
tableData->update(transaction, columnID, nodeIDVector, propertyVector);
}
inline void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector) const;
common::sel_t posInPropertyVector) const {
tableData->update(transaction, columnID, nodeOffset, propertyVector, posInPropertyVector);
}
void delete_(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
DeleteState* deleteState);
void append(NodeGroup* nodeGroup);
common::ValueVector* pkVector);
inline void append(NodeGroup* nodeGroup) { tableData->append(nodeGroup); }

inline common::column_id_t getNumColumns() const { return columns.size(); }
inline common::column_id_t getNumColumns() const { return tableData->getNumColumns(); }
inline NodeColumn* getColumn(common::column_id_t columnID) {
assert(columnID < columns.size());
return columns[columnID].get();
return tableData->getColumn(columnID);
}
inline common::column_id_t getPKColumnID() const { return pkColumnID; }
inline PrimaryKeyIndex* getPKIndex() const { return pkIndex.get(); }
Expand All @@ -64,9 +64,7 @@ class NodeTable {
}
inline common::table_id_t getTableID() const { return tableID; }

inline void dropColumn(common::column_id_t columnID) {
columns.erase(columns.begin() + columnID);
}
inline void dropColumn(common::column_id_t columnID) { tableData->dropColumn(columnID); }
void addColumn(const catalog::Property& property, common::ValueVector* defaultValueVector,
transaction::Transaction* transaction);

Expand All @@ -76,27 +74,14 @@ class NodeTable {
void rollbackInMemory();

private:
void initializeData(catalog::NodeTableSchema* nodeTableSchema);
void initializeColumns(catalog::NodeTableSchema* nodeTableSchema);

void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIds,
const std::vector<common::ValueVector*>& outputVectors);

void insertPK(common::ValueVector* nodeIDVector, common::ValueVector* primaryKeyVector);
inline uint64_t getNumNodeGroups(transaction::Transaction* transaction) const {
assert(!columns.empty());
return columns[0]->getNumNodeGroups(transaction);
return tableData->getNumNodeGroups(transaction);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
std::vector<std::unique_ptr<NodeColumn>> columns;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
std::unique_ptr<TableData> tableData;
common::column_id_t pkColumnID;
std::unique_ptr<PrimaryKeyIndex> pkIndex;
common::table_id_t tableID;
Expand Down
4 changes: 0 additions & 4 deletions src/include/storage/store/nodes_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ class NodesStore {
NodesStore(BMFileHandle* dataFH, BMFileHandle* metadataFH, const catalog::Catalog& catalog,
BufferManager& bufferManager, WAL* wal);

inline NodeColumn* getNodePropertyColumn(
common::table_id_t tableID, uint64_t propertyIdx) const {
return nodeTables.at(tableID)->getColumn(propertyIdx);
}
inline PrimaryKeyIndex* getPKIndex(common::table_id_t tableID) {
return nodeTables[tableID]->getPKIndex();
}
Expand Down
67 changes: 67 additions & 0 deletions src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "storage/store/node_column.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace storage {

class NodesStatisticsAndDeletedIDs;

class TableData {
public:
TableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, common::table_id_t tableID,
BufferManager* bufferManager, WAL* wal, const std::vector<catalog::Property*>& properties,
TablesStatistics* tablesStatistics);

void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::offset_t nodeOffset, common::ValueVector* propertyVector,
common::sel_t posInPropertyVector) const;
void append(NodeGroup* nodeGroup);

inline void dropColumn(common::column_id_t columnID) {
columns.erase(columns.begin() + columnID);
}
void addColumn(transaction::Transaction* transaction, const catalog::Property& property,
common::ValueVector* defaultValueVector, TablesStatistics* tableStats);

inline common::vector_idx_t getNumColumns() const { return columns.size(); }
inline NodeColumn* getColumn(common::column_id_t columnID) {
assert(columnID < columns.size());
return columns[columnID].get();
}
inline common::node_group_idx_t getNumNodeGroups(transaction::Transaction* transaction) const {
assert(!columns.empty());
return columns[0]->getNumNodeGroups(transaction);
}
inline BMFileHandle* getDataFH() const { return dataFH; }

void checkpointInMemory();
void rollbackInMemory();

private:
void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);

private:
std::vector<std::unique_ptr<NodeColumn>> columns;
BMFileHandle* dataFH;
BMFileHandle* metadataFH;
common::table_id_t tableID;
BufferManager* bufferManager;
WAL* wal;
};

} // namespace storage
} // namespace kuzu
17 changes: 7 additions & 10 deletions src/processor/operator/persistent/delete_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,32 @@ void NodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {

void SingleLabelNodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
NodeDeleteExecutor::init(resultSet, context);
deleteState = std::make_unique<NodeTable::DeleteState>();
auto pkDataType = table->getColumn(table->getPKColumnID())->getDataType();
deleteState->pkVector = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
deleteState->pkVector->state = nodeIDVector->state;
pkVector = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
pkVector->state = nodeIDVector->state;
}

void SingleLabelNodeDeleteExecutor::delete_(ExecutionContext* context) {
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector, deleteState.get());
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector, pkVector.get());
}

void MultiLabelNodeDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
NodeDeleteExecutor::init(resultSet, context);
for (auto& [tableID, table] : tableIDToTableMap) {
deleteStates[tableID] = std::make_unique<NodeTable::DeleteState>();
auto pkDataType = table->getColumn(table->getPKColumnID())->getDataType();
deleteStates[tableID]->pkVector =
std::make_unique<ValueVector>(pkDataType, context->memoryManager);
deleteStates[tableID]->pkVector->state = nodeIDVector->state;
pkVectors[tableID] = std::make_unique<ValueVector>(pkDataType, context->memoryManager);
pkVectors[tableID]->state = nodeIDVector->state;
}
}

void MultiLabelNodeDeleteExecutor::delete_(ExecutionContext* context) {
assert(nodeIDVector->state->selVector->selectedSize == 1);
auto pos = nodeIDVector->state->selVector->selectedPositions[0];
auto nodeID = nodeIDVector->getValue<internalID_t>(pos);
assert(tableIDToTableMap.contains(nodeID.tableID) && deleteStates.contains(nodeID.tableID));
assert(tableIDToTableMap.contains(nodeID.tableID) && pkVectors.contains(nodeID.tableID));
auto table = tableIDToTableMap.at(nodeID.tableID);
table->delete_(context->clientContext->getActiveTransaction(), nodeIDVector,
deleteStates.at(nodeID.tableID).get());
pkVectors.at(nodeID.tableID).get());
}

void RelDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* context) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_library(kuzu_storage_store
struct_column_chunk.cpp
struct_node_column.cpp
table_copy_utils.cpp
table_data.cpp
var_list_column_chunk.cpp
var_list_node_column.cpp)

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NodeGroup::NodeGroup(TableSchema* schema, CSVReaderConfig* csvReaderConfig)
}
}

NodeGroup::NodeGroup(NodeTable* table) : nodeGroupIdx{UINT64_MAX}, numNodes{0} {
NodeGroup::NodeGroup(TableData* table) : nodeGroupIdx{UINT64_MAX}, numNodes{0} {
chunks.reserve(table->getNumColumns());
for (auto columnID = 0u; columnID < table->getNumColumns(); columnID++) {
chunks.push_back(
Expand Down
Loading