Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table states #3072

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "common/serializer/deserializer.h"
#include "common/serializer/serializer.h"
#include "common/string_format.h"
#include "storage/storage_info.h"
#include "storage/storage_utils.h"
#include "storage/storage_version_info.h"

using namespace kuzu::binder;
using namespace kuzu::common;
Expand Down
5 changes: 2 additions & 3 deletions src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "function/table/bind_input.h"
#include "function/table/call_functions.h"
#include "storage/storage_manager.h"
#include "storage/store/node_table.h"
#include "storage/store/string_column.h"
#include "storage/store/struct_column.h"
#include "storage/store/var_list_column.h"
Expand Down Expand Up @@ -218,9 +219,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(
auto tableID = catalog->getTableID(context->getTx(), tableName);
auto tableEntry = catalog->getTableCatalogEntry(context->getTx(), tableID);
auto storageManager = context->getStorageManager();
auto table = tableEntry->getTableType() == TableType::NODE ?
reinterpret_cast<Table*>(storageManager->getNodeTable(tableID)) :
reinterpret_cast<Table*>(storageManager->getRelTable(tableID));
auto table = storageManager->getTable(tableID);
return std::make_unique<StorageInfoBindData>(
std::move(columnTypes), std::move(columnNames), tableEntry, table, context);
}
Expand Down
6 changes: 5 additions & 1 deletion src/include/processor/operator/persistent/insert_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
namespace kuzu {
namespace processor {

// TODO(Guodong): the following class should be moved to storage.
class NodeInsertExecutor {
public:
NodeInsertExecutor(storage::NodeTable* table,
Expand All @@ -32,6 +31,9 @@ class NodeInsertExecutor {
private:
NodeInsertExecutor(const NodeInsertExecutor& other);

bool checkConfict(transaction::Transaction* transaction);
void writeResult();

private:
// Node table to insert.
storage::NodeTable* table;
Expand Down Expand Up @@ -70,6 +72,8 @@ class RelInsertExecutor {
private:
RelInsertExecutor(const RelInsertExecutor& other);

void writeResult();

private:
storage::RelsStoreStats* relsStatistics;
storage::RelTable* table;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/scan/scan_multi_node_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ class ScanMultiNodeTables : public ScanTable {

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
ScanTable::initLocalStateInternal(resultSet, context);
for (auto& [tableID, _] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>();
for (auto& [tableID, scanInfo] : tables) {
readStates[tableID] = std::make_unique<storage::TableReadState>(
*inVector, scanInfo->columnIDs, outVectors);
}
}
bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ class RelTableCollectionScanner {
nextTableIdx = 0;
}

void init();
void init(
common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors);
bool scan(common::ValueVector* inVector, const std::vector<common::ValueVector*>& outputVectors,
transaction::Transaction* transaction);

std::unique_ptr<RelTableCollectionScanner> clone() const;

private:
std::vector<std::unique_ptr<ScanRelTableInfo>> scanInfos;
std::vector<std::unique_ptr<storage::RelDataReadState>> readStates;
std::vector<std::unique_ptr<storage::RelTableReadState>> readStates;
uint32_t currentTableIdx = UINT32_MAX;
uint32_t nextTableIdx = 0;
};
Expand Down
3 changes: 2 additions & 1 deletion src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ScanSingleNodeTable : public ScanTable {
inline void initLocalStateInternal(
ResultSet* resultSet, ExecutionContext* executionContext) final {
ScanTable::initLocalStateInternal(resultSet, executionContext);
readState = std::make_unique<storage::TableReadState>();
readState =
std::make_unique<storage::TableReadState>(*inVector, info->columnIDs, outVectors);
}

bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
15 changes: 11 additions & 4 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@ class ScanRelTable : public ScanTable {
std::vector<DataPos> outVectorsPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
: ScanRelTable{PhysicalOperatorType::SCAN_REL_TABLE, std::move(info), inVectorPos,
std::move(outVectorsPos), std::move(child), id, paramsString} {
scanState = std::make_unique<storage::RelDataReadState>();
}
std::move(outVectorsPos), std::move(child), id, paramsString} {}
~ScanRelTable() override = default;

inline void initLocalStateInternal(
ResultSet* resultSet, ExecutionContext* executionContext) override {
ScanTable::initLocalStateInternal(resultSet, executionContext);
if (info) {
scanState = std::make_unique<storage::RelTableReadState>(
*inVector, info->columnIDs, outVectors, info->direction);
}
}

bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
Expand All @@ -50,7 +57,7 @@ class ScanRelTable : public ScanTable {

protected:
std::unique_ptr<ScanRelTableInfo> info;
std::unique_ptr<storage::RelDataReadState> scanState;
std::unique_ptr<storage::RelTableReadState> scanState;
};

} // namespace processor
Expand Down
18 changes: 18 additions & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,23 @@
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
};

class LocalNodeTable final : public LocalTable {

Check warning on line 53 in src/include/storage/local_storage/local_node_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_node_table.h#L53

Added line #L53 was not covered by tests
public:
explicit LocalNodeTable(Table& table);

bool insert(TableInsertState& insertState) override;
bool update(TableUpdateState& updateState) override;
bool delete_(TableDeleteState& deleteState) override;

void scan(TableReadState& state) override;
void lookup(TableReadState& state) override;

LocalNodeTableData* getTableData() {
KU_ASSERT(localTableDataCollection.size() == 1);
return common::ku_dynamic_cast<LocalTableData*, LocalNodeTableData*>(
localTableDataCollection[0].get());
}
};

} // namespace storage
} // namespace kuzu
33 changes: 22 additions & 11 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/copy_constructors.h"
#include "common/enums/rel_multiplicity.h"
#include "common/enums/rel_direction.h"
#include "common/vector/value_vector.h"
#include "storage/local_storage/local_table.h"

Expand All @@ -15,8 +15,7 @@
friend class RelTableData;

public:
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes,
common::RelMultiplicity multiplicity);
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes);
DELETE_COPY_DEFAULT_MOVE(LocalRelNG);

common::row_idx_t scanCSR(common::offset_t srcOffset, common::offset_t posToReadForOffset,
Expand Down Expand Up @@ -46,24 +45,36 @@
void applyCSRUpdates(common::column_id_t columnID, common::ValueVector* relIDVector,
common::ValueVector* outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk, common::ValueVector* relIDVector);

private:
common::RelMultiplicity multiplicity;
};

class LocalRelTableData final : public LocalTableData {
friend class RelTableData;

public:
LocalRelTableData(
common::RelMultiplicity multiplicity, std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)}, multiplicity{multiplicity} {}
explicit LocalRelTableData(std::vector<common::LogicalType> dataTypes)
: LocalTableData{std::move(dataTypes)} {}

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
};

private:
common::RelMultiplicity multiplicity;
class LocalRelTable final : public LocalTable {

Check warning on line 61 in src/include/storage/local_storage/local_rel_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_rel_table.h#L61

Added line #L61 was not covered by tests
public:
explicit LocalRelTable(Table& table);

bool insert(TableInsertState& insertState) override;
bool update(TableUpdateState& updateState) override;
bool delete_(TableDeleteState& deleteState) override;

void scan(TableReadState& state) override;
void lookup(TableReadState& state) override;

LocalRelTableData* getTableData(common::RelDataDirection direction) {
KU_ASSERT(localTableDataCollection.size() == 2);
return common::ku_dynamic_cast<LocalTableData*, LocalRelTableData*>(
direction == common::RelDataDirection::FWD ? localTableDataCollection[0].get() :
localTableDataCollection[1].get());
}
};

} // namespace storage
Expand Down
22 changes: 12 additions & 10 deletions src/include/storage/local_storage/local_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include "storage/local_storage/local_table.h"

namespace kuzu {
namespace main {
class ClientContext;
} // namespace main
namespace storage {

// Data structures in LocalStorage are not thread-safe.
Expand All @@ -14,20 +17,19 @@ namespace storage {
// thread-safe.
class LocalStorage {
public:
explicit LocalStorage() {}
enum class NotExistAction { CREATE, RETURN_NULL };

explicit LocalStorage(main::ClientContext& clientContext) : clientContext{clientContext} {}
DELETE_COPY_AND_MOVE(LocalStorage);

// This function will create the local table data if not exists.
LocalTableData* getOrCreateLocalTableData(common::table_id_t tableID,
const std::vector<std::unique_ptr<Column>>& columns,
common::TableType tableType = common::TableType::NODE, common::vector_idx_t dataIdx = 0,
common::RelMultiplicity multiplicity = common::RelMultiplicity::MANY);
LocalTable* getLocalTable(common::table_id_t tableID);
// This function will return nullptr if the local table does not exist.
LocalTableData* getLocalTableData(common::table_id_t tableID, common::vector_idx_t dataIdx = 0);
std::unordered_set<common::table_id_t> getTableIDsWithUpdates();
LocalTable* getLocalTable(
common::table_id_t tableID, NotExistAction action = NotExistAction::RETURN_NULL);

void prepareCommit();
void prepareRollback();

private:
main::ClientContext& clientContext;
std::unordered_map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
};

Expand Down
31 changes: 19 additions & 12 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <unordered_map>

#include "common/enums/rel_multiplicity.h"
#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"
#include "storage/store/chunked_node_group_collection.h"

Expand Down Expand Up @@ -198,20 +196,29 @@
std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};

class Column;
struct TableInsertState;
struct TableUpdateState;
struct TableDeleteState;
struct TableReadState;
class Table;
class LocalTable {
public:
explicit LocalTable(common::TableType tableType) : tableType{tableType} {};
virtual ~LocalTable() = default;

Check warning on line 206 in src/include/storage/local_storage/local_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/local_storage/local_table.h#L206

Added line #L206 was not covered by tests

LocalTableData* getOrCreateLocalTableData(const std::vector<std::unique_ptr<Column>>& columns,
common::vector_idx_t dataIdx, common::RelMultiplicity multiplicity);
inline LocalTableData* getLocalTableData(common::vector_idx_t dataIdx) {
KU_ASSERT(dataIdx < localTableDataCollection.size());
return localTableDataCollection[dataIdx].get();
}
virtual bool insert(TableInsertState& insertState) = 0;
virtual bool update(TableUpdateState& updateState) = 0;
virtual bool delete_(TableDeleteState& deleteState) = 0;

private:
common::TableType tableType;
virtual void scan(TableReadState& state) = 0;
virtual void lookup(TableReadState& state) = 0;

inline void clear() { localTableDataCollection.clear(); }

protected:
explicit LocalTable(Table& table) : table{table} {}

protected:
Table& table;
// For a node table, it should only contain one LocalTableData, while a rel table should contain
// two, one for each direction.
std::vector<std::unique_ptr<LocalTableData>> localTableDataCollection;
Expand Down
15 changes: 3 additions & 12 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/stats/rels_store_statistics.h"
#include "storage/store/node_table.h"
#include "storage/store/rel_table.h"
#include "storage/wal/wal.h"

Expand All @@ -27,17 +26,9 @@ class StorageManager {

PrimaryKeyIndex* getPKIndex(common::table_id_t tableID);

inline NodeTable* getNodeTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID) &&
tables.at(tableID)->getTableType() == common::TableType::NODE);
auto table = common::ku_dynamic_cast<Table*, NodeTable*>(tables.at(tableID).get());
return table;
}
inline RelTable* getRelTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID) &&
tables.at(tableID)->getTableType() == common::TableType::REL);
auto table = common::ku_dynamic_cast<Table*, RelTable*>(tables.at(tableID).get());
return table;
inline Table* getTable(common::table_id_t tableID) const {
KU_ASSERT(tables.contains(tableID));
return tables.at(tableID).get();
}

inline WAL* getWAL() const { return wal; }
Expand Down
Loading
Loading