Skip to content

Commit

Permalink
table scan/update/insert/delete state
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 19, 2024
1 parent 0531afe commit 7cc8275
Show file tree
Hide file tree
Showing 60 changed files with 806 additions and 568 deletions.
2 changes: 1 addition & 1 deletion src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "common/serializer/deserializer.h"
#include "common/serializer/serializer.h"
#include "common/string_format.h"
#include "storage/storage_info.h"
#include "storage/storage_utils.h"
#include "storage/storage_version_info.h"

using namespace kuzu::binder;
using namespace kuzu::common;
Expand Down
5 changes: 2 additions & 3 deletions src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "function/table/bind_input.h"
#include "function/table/call_functions.h"
#include "storage/storage_manager.h"
#include "storage/store/node_table.h"
#include "storage/store/string_column.h"
#include "storage/store/struct_column.h"
#include "storage/store/var_list_column.h"
Expand Down Expand Up @@ -218,9 +219,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(
auto tableID = catalog->getTableID(context->getTx(), tableName);
auto tableEntry = catalog->getTableCatalogEntry(context->getTx(), tableID);
auto storageManager = context->getStorageManager();
auto table = tableEntry->getTableType() == TableType::NODE ?
reinterpret_cast<Table*>(storageManager->getNodeTable(tableID)) :
reinterpret_cast<Table*>(storageManager->getRelTable(tableID));
auto table = storageManager->getTable(tableID);
return std::make_unique<StorageInfoBindData>(
std::move(columnTypes), std::move(columnNames), tableEntry, table, context);
}
Expand Down
6 changes: 5 additions & 1 deletion src/include/processor/operator/persistent/insert_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
namespace kuzu {
namespace processor {

// TODO(Guodong): the following class should be moved to storage.
class NodeInsertExecutor {
public:
NodeInsertExecutor(storage::NodeTable* table,
Expand All @@ -32,6 +31,9 @@ class NodeInsertExecutor {
private:
NodeInsertExecutor(const NodeInsertExecutor& other);

bool checkConfict(transaction::Transaction* transaction);
void writeResult();

private:
// Node table to insert.
storage::NodeTable* table;
Expand Down Expand Up @@ -70,6 +72,8 @@ class RelInsertExecutor {
private:
RelInsertExecutor(const RelInsertExecutor& other);

void writeResult();

private:
storage::RelsStoreStats* relsStatistics;
storage::RelTable* table;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/scan/scan_multi_node_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ class ScanMultiNodeTables : public ScanTable {

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [tableID, _] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>();
for (auto& [tableID, scanInfo] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>(
*inVector, scanInfo->columnIDs, outVectors);
}
}
bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ class RelTableCollectionScanner {
nextTableIdx = 0;
}

void init();
void init(
common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors);
bool scan(common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors,
transaction::Transaction* transaction);

std::unique_ptr<RelTableCollectionScanner> clone() const;

private:
std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos;
std::vector<std::unique_ptr<storage::RelDataReadState>> readStates;
std::vector<std::unique_ptr<storage::RelTableReadState>> readStates;
uint32_t currentTableIdx = UINT32_MAX;
uint32_t nextTableIdx = 0;
};
Expand Down
3 changes: 2 additions & 1 deletion src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ScanSingleNodeTable : public ScanTable {
inline void initLocalStateInternal(
ResultSet* resultSet, ExecutionContext* executionContext) final {
ScanTable::initLocalStateInternal(resultSet, executionContext);
readState = std::make_unique<storage::TableReadState>();
readState =
std::make_unique<storage::TableReadState>(*inVector, info->columnIDs, outVectors);
}

bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
13 changes: 9 additions & 4 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ class ScanRelTable : public ScanTable {
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_REL_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {
scanState = std::make_unique<storage::RelDataReadState>();
}
std::move(outVectorsPos), std::move(child), id, paramsString} {}
~ScanRelTable() override = default;

inline void initLocalStateInternal(
ResultSet* resultSet, ExecutionContext* executionContext) override {
ScanTable::initLocalStateInternal(resultSet, executionContext);
scanState = std::make_unique<storage::RelTableReadState>(
*inVector, info->columnIDs, outVectors, info->direction);
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
Expand All @@ -50,7 +55,7 @@ class ScanRelTable : public ScanTable {

protected:
std::unique_ptr<ScanRelTableInfo> info;
std::unique_ptr<storage::RelDataReadState> scanState;
std::unique_ptr<storage::RelTableReadState> scanState;
};

} // namespace processor
Expand Down
18 changes: 18 additions & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,23 @@ class LocalNodeTableData final : public LocalTableData {
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
};

class LocalNodeTable final : public LocalTable {

Check warning on line 53 in src/include/storage/local_storage/local_node_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_node_table.h#L53

Added line #L53 was not covered by tests
public:
explicit LocalNodeTable(Table& table);

bool insert(TableInsertState& insertState) override;
bool update(TableUpdateState& updateState) override;
bool delete_(TableDeleteState& deleteState) override;

void scan(TableReadState& state) override;
void lookup(TableReadState& state) override;

LocalNodeTableData* getTableData() {
KU_ASSERT(localTableDataCollection.size() == 1);
return common::ku_dynamic_cast<LocalTableData*, LocalNodeTableData*>(
localTableDataCollection[0].get());
}
};

} // namespace storage
} // namespace kuzu
33 changes: 22 additions & 11 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/copy_constructors.h"
#include "common/enums/rel_multiplicity.h"
#include "common/enums/rel_direction.h"
#include "common/vector/value_vector.h"
#include "storage/local_storage/local_table.h"

Expand All @@ -15,8 +15,7 @@ class LocalRelNG final : public LocalNodeGroup {
friend class RelTableData;

public:
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes,
common::RelMultiplicity multiplicity);
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes);
DELETE_COPY_DEFAULT_MOVE(LocalRelNG);

common::row_idx_t scanCSR(common::offset_t srcOffset, common::offset_t posToReadForOffset,
Expand Down Expand Up @@ -46,24 +45,36 @@ class LocalRelNG final : public LocalNodeGroup {
void applyCSRUpdates(common::column_id_t columnID, common::ValueVector* relIDVector,
common::ValueVector* outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk, common::ValueVector* relIDVector);

private:
common::RelMultiplicity multiplicity;
};

class LocalRelTableData final : public LocalTableData {
friend class RelTableData;

public:
LocalRelTableData(
common::RelMultiplicity multiplicity, std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)}, multiplicity{multiplicity} {}
explicit LocalRelTableData(std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)} {}

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
};

private:
common::RelMultiplicity multiplicity;
class LocalRelTable final : public LocalTable {

Check warning on line 61 in src/include/storage/local_storage/local_rel_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_rel_table.h#L61

Added line #L61 was not covered by tests
public:
explicit LocalRelTable(Table& table);

bool insert(TableInsertState& insertState) override;
bool update(TableUpdateState& updateState) override;
bool delete_(TableDeleteState& deleteState) override;

void scan(TableReadState& state) override;
void lookup(TableReadState& state) override;

LocalRelTableData* getTableData(common::RelDataDirection direction) {
KU_ASSERT(localTableDataCollection.size() == 2);
return common::ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(
direction == common::RelDataDirection::FWD ? localTableDataCollection[0].get() :
localTableDataCollection[1].get());
}
};

} // namespace storage
Expand Down
22 changes: 12 additions & 10 deletions src/include/storage/local_storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include "storage/local_storage/local_table.h"

namespace kuzu {
namespace main {
class ClientContext;
} // namespace main
namespace storage {

// Data structures in LocalStorage are not thread-safe.
Expand All @@ -14,20 +17,19 @@ namespace storage {
// thread-safe.
class LocalStorage {
public:
explicit LocalStorage() {}
enum class NotExistAction { CREATE, RETURN_NULL };

explicit LocalStorage(main::ClientContext& clientContext) : clientContext{clientContext} {}
DELETE_COPY_AND_MOVE(LocalStorage);

// This function will create the local table data if not exists.
LocalTableData* getOrCreateLocalTableData(common::table_id_t tableID,
const std::vector<std::unique_ptr<Column>>& columns,
common::TableType tableType = common::TableType::NODE, common::vector_idx_t dataIdx = 0,
common::RelMultiplicity multiplicity = common::RelMultiplicity::MANY);
LocalTable* getLocalTable(common::table_id_t tableID);
// This function will return nullptr if the local table does not exist.
LocalTableData* getLocalTableData(common::table_id_t tableID, common::vector_idx_t dataIdx = 0);
std::unordered_set<common::table_id_t> getTableIDsWithUpdates();
LocalTable* getLocalTable(
common::table_id_t tableID, NotExistAction action = NotExistAction::RETURN_NULL);

void prepareCommit();
void prepareRollback();

private:
main::ClientContext& clientContext;
std::unordered_map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
};

Expand Down
31 changes: 19 additions & 12 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <unordered_map>

#include "common/enums/rel_multiplicity.h"
#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"
#include "storage/store/node_group.h"

Expand Down Expand Up @@ -192,20 +190,29 @@ class LocalTableData {
std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};

class Column;
struct TableInsertState;
struct TableUpdateState;
struct TableDeleteState;
struct TableReadState;
class Table;
class LocalTable {
public:
explicit LocalTable(common::TableType tableType) : tableType{tableType} {};
virtual ~LocalTable() = default;

Check warning on line 200 in src/include/storage/local_storage/local_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_table.h#L200

Added line #L200 was not covered by tests

LocalTableData* getOrCreateLocalTableData(const std::vector<std::unique_ptr<Column>>& columns,
common::vector_idx_t dataIdx, common::RelMultiplicity multiplicity);
inline LocalTableData* getLocalTableData(common::vector_idx_t dataIdx) {
KU_ASSERT(dataIdx < localTableDataCollection.size());
return localTableDataCollection[dataIdx].get();
}
virtual bool insert(TableInsertState& insertState) = 0;
virtual bool update(TableUpdateState& updateState) = 0;
virtual bool delete_(TableDeleteState& deleteState) = 0;

private:
common::TableType tableType;
virtual void scan(TableReadState& state) = 0;
virtual void lookup(TableReadState& state) = 0;

inline void clear() { localTableDataCollection.clear(); }

protected:
explicit LocalTable(Table& table) : table{table} {}

protected:
Table& table;
// For a node table, it should only contain one LocalTableData, while a rel table should contain
// two, one for each direction.
std::vector<std::unique_ptr<LocalTableData>> localTableDataCollection;
Expand Down
15 changes: 3 additions & 12 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/stats/rels_store_statistics.h"
#include "storage/store/node_table.h"
#include "storage/store/rel_table.h"
#include "storage/wal/wal.h"

Expand All @@ -27,17 +26,9 @@ class StorageManager {

PrimaryKeyIndex* getPKIndex(common::table_id_t tableID);

inline NodeTable* getNodeTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID) &&
tables.at(tableID)->getTableType() == common::TableType::NODE);
auto table = common::ku_dynamic_cast<Table*, NodeTable*>(tables.at(tableID).get());
return table;
}
inline RelTable* getRelTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID) &&
tables.at(tableID)->getTableType() == common::TableType::REL);
auto table = common::ku_dynamic_cast<Table*, RelTable*>(tables.at(tableID).get());
return table;
inline Table* getTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID));
return tables.at(tableID).get();
}

inline WAL* getWAL() const { return wal; }
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ChunkedNodeGroupCollection {
}
inline uint64_t getNumChunks() const { return chunkedGroups.size(); }
void append(std::unique_ptr<ChunkedNodeGroup> chunkedGroup);
inline void clear() { chunkedGroups.clear(); }

private:
// Assert that all chunked node groups have the same num columns and same data types.
Expand Down
Loading

0 comments on commit 7cc8275

Please sign in to comment.