Skip to content

Commit

Permalink
Merge pull request #2420 from kuzudb/rework-local-storage
Browse files Browse the repository at this point in the history
Add abstraction of LocalTableData to LocalStorage
  • Loading branch information
ray6080 committed Nov 16, 2023
2 parents 507e245 + 3cde464 commit 8ad6ce0
Show file tree
Hide file tree
Showing 21 changed files with 188 additions and 181 deletions.
52 changes: 0 additions & 52 deletions src/include/storage/local_storage.h

This file was deleted.

34 changes: 34 additions & 0 deletions src/include/storage/local_storage/local_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <unordered_map>

#include "storage/local_storage/local_table.h"

namespace kuzu {
namespace storage {

class Column;
class MemoryManager;

// Data structures in LocalStorage are not thread-safe.
// For now, we only support single thread insertions and updates. Once we optimize them with
// multiple threads, LocalStorage and its related data structures should be reworked to be
// thread-safe.
class LocalStorage {
public:
LocalStorage(storage::MemoryManager* mm);

// This function will create the local table data if not exists.
LocalTableData* getOrCreateLocalTableData(
common::table_id_t tableID, const std::vector<std::unique_ptr<Column>>& columns);
// This function will return nullptr if the local table data does not exist.
LocalTableData* getLocalTableData(common::table_id_t tableID);
std::unordered_set<common::table_id_t> getTableIDsWithUpdates();

private:
std::unordered_map<common::table_id_t, std::unique_ptr<LocalTable>> tables;
storage::MemoryManager* mm;
};

} // namespace storage
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <map>

#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"

namespace kuzu {
Expand Down Expand Up @@ -79,15 +80,7 @@ class LocalNodeGroup {
friend class NodeTableData;

public:
LocalNodeGroup(
const std::vector<std::unique_ptr<common::LogicalType>>& dataTypes, MemoryManager* mm) {
columns.resize(dataTypes.size());
for (auto i = 0u; i < dataTypes.size(); ++i) {
// To avoid unnecessary memory consumption, we chunk local changes of each column in the
// node group into chunks of size DEFAULT_VECTOR_CAPACITY.
columns[i] = std::make_unique<LocalVectorCollection>(dataTypes[i].get(), mm);
}
}
LocalNodeGroup(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm);

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
Expand All @@ -113,13 +106,12 @@ class LocalNodeGroup {
std::vector<std::unique_ptr<LocalVectorCollection>> columns;
};

class LocalTable {
class LocalTableData {
friend class NodeTableData;

public:
explicit LocalTable(common::table_id_t tableID,
std::vector<std::unique_ptr<common::LogicalType>> dataTypes, MemoryManager* mm)
: tableID{tableID}, dataTypes{std::move(dataTypes)}, mm{mm} {};
LocalTableData(std::vector<common::LogicalType*> dataTypes, MemoryManager* mm)
: dataTypes{std::move(dataTypes)}, mm{mm} {}

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
Expand All @@ -139,11 +131,26 @@ class LocalTable {
common::node_group_idx_t initializeLocalNodeGroup(common::offset_t nodeOffset);

private:
common::table_id_t tableID;
std::vector<std::unique_ptr<common::LogicalType>> dataTypes;
std::vector<common::LogicalType*> dataTypes;
MemoryManager* mm;
std::unordered_map<common::node_group_idx_t, std::unique_ptr<LocalNodeGroup>> nodeGroups;
};

class Column;
class LocalTable {
public:
LocalTable(common::table_id_t tableID, common::TableType tableType)
: tableID{tableID}, tableType{tableType} {};

LocalTableData* getOrCreateLocalTableData(
const std::vector<std::unique_ptr<Column>>& columns, MemoryManager* mm);
inline LocalTableData* getLocalTableData() { return localTableData.get(); }

private:
common::table_id_t tableID;
common::TableType tableType;
std::unique_ptr<LocalTableData> localTableData;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using batch_lookup_func_t = read_values_to_page_func_t;

class NullColumn;
class StructColumn;
class LocalVectorCollection;
class Column {
friend class LocalColumn;
friend class StringLocalColumn;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class NodeTable : public Table {
common::ValueVector* defaultValueVector) final;
inline void dropColumn(common::column_id_t columnID) final { tableData->dropColumn(columnID); }

void prepareCommit(LocalTable* localTable) final;
void prepareRollback(LocalTable* localTable) final;
void prepareCommit(LocalTableData* localTable) final;
void prepareRollback(LocalTableData* localTable) final;
void checkpointInMemory() final;
void rollbackInMemory() final;

Expand Down
4 changes: 3 additions & 1 deletion src/include/storage/store/node_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace kuzu {
namespace storage {

class LocalTableData;

class NodeTableData : public TableData {
public:
NodeTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, common::table_id_t tableID,
Expand Down Expand Up @@ -33,7 +35,7 @@ class NodeTableData : public TableData {

void append(NodeGroup* nodeGroup) final;

void prepareLocalTableToCommit(LocalTable* localTable);
void prepareLocalTableToCommit(LocalTableData* localTable);
};

} // namespace storage
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class RelTable : public Table {
bwdRelTableData->append(nodeGroup);
}

void prepareCommit(LocalTable* localTable) final;
void prepareRollback(LocalTable* localTable) final;
void prepareCommit(LocalTableData* localTable) final;
void prepareRollback(LocalTableData* localTable) final;
void checkpointInMemory() final;
void rollbackInMemory() final;

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RelTableData : public TableData {
common::ColumnDataFormat::CSR;
}

void prepareLocalTableToCommit(LocalTable* localTable);
void prepareLocalTableToCommit(LocalTableData* localTable);

private:
std::unique_ptr<Column> adjColumn;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class Table {
common::ValueVector* defaultValueVector) = 0;
virtual void dropColumn(common::column_id_t columnID) = 0;

virtual void prepareCommit(LocalTable* localTable) = 0;
virtual void prepareRollback(LocalTable* localTable) = 0;
virtual void prepareCommit(LocalTableData* localTable) = 0;
virtual void prepareRollback(LocalTableData* localTable) = 0;
virtual void checkpointInMemory() = 0;
virtual void rollbackInMemory() = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct TableReadState {
std::vector<common::column_id_t> columnIDs;
};

class LocalTableData;
class TableData {
public:
virtual ~TableData() = default;
Expand Down Expand Up @@ -43,7 +44,7 @@ class TableData {
return columns[0]->getNumNodeGroups(transaction);
}

virtual void prepareLocalTableToCommit(LocalTable* localTable) = 0;
virtual void prepareLocalTableToCommit(LocalTableData* localTable) = 0;
virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down
7 changes: 5 additions & 2 deletions src/include/transaction/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

#include <memory>

#include "storage/local_storage.h"
#include "storage/local_storage/local_storage.h"

namespace kuzu {
namespace storage {
class LocalStorage;
class MemoryManager;
} // namespace storage
namespace transaction {

class TransactionManager;

enum class TransactionType : uint8_t { READ_ONLY, WRITE };
Expand Down
3 changes: 1 addition & 2 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(buffer_manager)
add_subdirectory(compression)
add_subdirectory(index)
add_subdirectory(local_storage)
add_subdirectory(stats)
add_subdirectory(storage_structure)
add_subdirectory(store)
Expand All @@ -9,8 +10,6 @@ add_subdirectory(wal)
add_library(kuzu_storage
OBJECT
file_handle.cpp
local_storage.cpp
local_table.cpp
storage_info.cpp
storage_manager.cpp
storage_utils.cpp
Expand Down
60 changes: 0 additions & 60 deletions src/storage/local_storage.cpp

This file was deleted.

8 changes: 8 additions & 0 deletions src/storage/local_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
add_library(kuzu_storage_local_storage
OBJECT
local_table.cpp
local_storage.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_storage_local_storage>
PARENT_SCOPE)
38 changes: 38 additions & 0 deletions src/storage/local_storage/local_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "storage/local_storage/local_storage.h"

#include "storage/local_storage/local_table.h"
#include "storage/store/column.h"

using namespace kuzu::common;
using namespace kuzu::transaction;

namespace kuzu {
namespace storage {

LocalStorage::LocalStorage(MemoryManager* mm) : mm{mm} {}

LocalTableData* LocalStorage::getOrCreateLocalTableData(
common::table_id_t tableID, const std::vector<std::unique_ptr<Column>>& columns) {
if (!tables.contains(tableID)) {
tables.emplace(tableID, std::make_unique<LocalTable>(tableID, TableType::NODE));
}
return tables.at(tableID)->getOrCreateLocalTableData(columns, mm);
}

LocalTableData* LocalStorage::getLocalTableData(common::table_id_t tableID) {
if (!tables.contains(tableID)) {
return nullptr;
}
return tables.at(tableID)->getLocalTableData();
}

std::unordered_set<table_id_t> LocalStorage::getTableIDsWithUpdates() {
std::unordered_set<table_id_t> tableSetToUpdate;
for (auto& [tableID, _] : tables) {
tableSetToUpdate.insert(tableID);
}
return tableSetToUpdate;
}

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit 8ad6ce0

Please sign in to comment.