diff --git a/src/include/storage/local_storage.h b/src/include/storage/local_storage.h deleted file mode 100644 index 9c5715f28b..0000000000 --- a/src/include/storage/local_storage.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include "storage/local_table.h" - -namespace kuzu { -namespace storage { - -class Column; - -// Data structures in LocalStorage are not thread-safe. -// For now, we only support single thread insertions and updates. Once we optimize them with -// multiple threads, LocalStorage and its related data structures should be reworked to be -// thread-safe. -class LocalStorage { -public: - LocalStorage(storage::MemoryManager* mm); - - void scan(common::table_id_t tableID, common::ValueVector* nodeIDVector, - const std::vector& columnIDs, - const std::vector& outputVectors); - void lookup(common::table_id_t tableID, common::ValueVector* nodeIDVector, - const std::vector& columnIDs, - const std::vector& outputVectors); - - // Note: `initializeLocalTable` should be called before `insert` and `update`. - void initializeLocalTable( - common::table_id_t tableID, const std::vector>& columns); - void insert(common::table_id_t tableID, common::ValueVector* nodeIDVector, - const std::vector& propertyVectors); - void update(common::table_id_t tableID, common::ValueVector* nodeIDVector, - common::column_id_t columnID, common::ValueVector* propertyVector); - void delete_(common::table_id_t tableID, common::ValueVector* nodeIDVector); - - inline std::unordered_set getTableIDsWithUpdates() { - std::unordered_set tableSetToUpdate; - for (auto& [tableID, _] : tables) { - tableSetToUpdate.insert(tableID); - } - return tableSetToUpdate; - } - inline LocalTable* getLocalTable(common::table_id_t tableID) { - KU_ASSERT(tables.contains(tableID)); - return tables.at(tableID).get(); - } - -private: - std::map> tables; - storage::MemoryManager* mm; -}; - -} // namespace storage -} // namespace kuzu diff --git a/src/include/storage/local_storage/local_storage.h b/src/include/storage/local_storage/local_storage.h new file mode 100644 index 0000000000..70c786073f --- /dev/null +++ b/src/include/storage/local_storage/local_storage.h @@ -0,0 +1,34 @@ +#pragma once + +#include + +#include "storage/local_storage/local_table.h" + +namespace kuzu { +namespace storage { + +class Column; +class MemoryManager; + +// Data structures in LocalStorage are not thread-safe. +// For now, we only support single thread insertions and updates. Once we optimize them with +// multiple threads, LocalStorage and its related data structures should be reworked to be +// thread-safe. +class LocalStorage { +public: + LocalStorage(storage::MemoryManager* mm); + + // This function will create the local table data if not exists. + LocalTableData* getOrCreateLocalTableData( + common::table_id_t tableID, const std::vector>& columns); + // This function will return nullptr if the local table data does not exist. + LocalTableData* getLocalTableData(common::table_id_t tableID); + std::unordered_set getTableIDsWithUpdates(); + +private: + std::unordered_map> tables; + storage::MemoryManager* mm; +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/local_table.h b/src/include/storage/local_storage/local_table.h similarity index 87% rename from src/include/storage/local_table.h rename to src/include/storage/local_storage/local_table.h index d1ed4abe4f..ea62f8e9b7 100644 --- a/src/include/storage/local_table.h +++ b/src/include/storage/local_storage/local_table.h @@ -2,6 +2,7 @@ #include +#include "common/enums/table_type.h" #include "common/vector/value_vector.h" namespace kuzu { @@ -79,15 +80,7 @@ class LocalNodeGroup { friend class NodeTableData; public: - LocalNodeGroup( - const std::vector>& dataTypes, MemoryManager* mm) { - columns.resize(dataTypes.size()); - for (auto i = 0u; i < dataTypes.size(); ++i) { - // To avoid unnecessary memory consumption, we chunk local changes of each column in the - // node group into chunks of size DEFAULT_VECTOR_CAPACITY. - columns[i] = std::make_unique(dataTypes[i].get(), mm); - } - } + LocalNodeGroup(std::vector dataTypes, MemoryManager* mm); void scan(common::ValueVector* nodeIDVector, const std::vector& columnIDs, const std::vector& outputVectors); @@ -113,13 +106,12 @@ class LocalNodeGroup { std::vector> columns; }; -class LocalTable { +class LocalTableData { friend class NodeTableData; public: - explicit LocalTable(common::table_id_t tableID, - std::vector> dataTypes, MemoryManager* mm) - : tableID{tableID}, dataTypes{std::move(dataTypes)}, mm{mm} {}; + LocalTableData(std::vector dataTypes, MemoryManager* mm) + : dataTypes{std::move(dataTypes)}, mm{mm} {} void scan(common::ValueVector* nodeIDVector, const std::vector& columnIDs, const std::vector& outputVectors); @@ -139,11 +131,26 @@ class LocalTable { common::node_group_idx_t initializeLocalNodeGroup(common::offset_t nodeOffset); private: - common::table_id_t tableID; - std::vector> dataTypes; + std::vector dataTypes; MemoryManager* mm; std::unordered_map> nodeGroups; }; +class Column; +class LocalTable { +public: + LocalTable(common::table_id_t tableID, common::TableType tableType) + : tableID{tableID}, tableType{tableType} {}; + + LocalTableData* getOrCreateLocalTableData( + const std::vector>& columns, MemoryManager* mm); + inline LocalTableData* getLocalTableData() { return localTableData.get(); } + +private: + common::table_id_t tableID; + common::TableType tableType; + std::unique_ptr localTableData; +}; + } // namespace storage } // namespace kuzu diff --git a/src/include/storage/store/column.h b/src/include/storage/store/column.h index 80f828e013..29741e58f6 100644 --- a/src/include/storage/store/column.h +++ b/src/include/storage/store/column.h @@ -25,6 +25,7 @@ using batch_lookup_func_t = read_values_to_page_func_t; class NullColumn; class StructColumn; +class LocalVectorCollection; class Column { friend class LocalColumn; friend class StringLocalColumn; diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index fbed6b6fec..e2f8f0ee55 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -67,8 +67,8 @@ class NodeTable : public Table { common::ValueVector* defaultValueVector) final; inline void dropColumn(common::column_id_t columnID) final { tableData->dropColumn(columnID); } - void prepareCommit(LocalTable* localTable) final; - void prepareRollback(LocalTable* localTable) final; + void prepareCommit(LocalTableData* localTable) final; + void prepareRollback(LocalTableData* localTable) final; void checkpointInMemory() final; void rollbackInMemory() final; diff --git a/src/include/storage/store/node_table_data.h b/src/include/storage/store/node_table_data.h index 8b159c31bd..4c92b11932 100644 --- a/src/include/storage/store/node_table_data.h +++ b/src/include/storage/store/node_table_data.h @@ -5,6 +5,8 @@ namespace kuzu { namespace storage { +class LocalTableData; + class NodeTableData : public TableData { public: NodeTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, common::table_id_t tableID, @@ -33,7 +35,7 @@ class NodeTableData : public TableData { void append(NodeGroup* nodeGroup) final; - void prepareLocalTableToCommit(LocalTable* localTable); + void prepareLocalTableToCommit(LocalTableData* localTable); }; } // namespace storage diff --git a/src/include/storage/store/rel_table.h b/src/include/storage/store/rel_table.h index b1b47d0020..5cd33cd383 100644 --- a/src/include/storage/store/rel_table.h +++ b/src/include/storage/store/rel_table.h @@ -44,8 +44,8 @@ class RelTable : public Table { bwdRelTableData->append(nodeGroup); } - void prepareCommit(LocalTable* localTable) final; - void prepareRollback(LocalTable* localTable) final; + void prepareCommit(LocalTableData* localTable) final; + void prepareRollback(LocalTableData* localTable) final; void checkpointInMemory() final; void rollbackInMemory() final; diff --git a/src/include/storage/store/rel_table_data.h b/src/include/storage/store/rel_table_data.h index 6e9c20e205..d323090d21 100644 --- a/src/include/storage/store/rel_table_data.h +++ b/src/include/storage/store/rel_table_data.h @@ -78,7 +78,7 @@ class RelTableData : public TableData { common::ColumnDataFormat::CSR; } - void prepareLocalTableToCommit(LocalTable* localTable); + void prepareLocalTableToCommit(LocalTableData* localTable); private: std::unique_ptr adjColumn; diff --git a/src/include/storage/store/table.h b/src/include/storage/store/table.h index 657f7969dd..3388b2e3d1 100644 --- a/src/include/storage/store/table.h +++ b/src/include/storage/store/table.h @@ -31,8 +31,8 @@ class Table { common::ValueVector* defaultValueVector) = 0; virtual void dropColumn(common::column_id_t columnID) = 0; - virtual void prepareCommit(LocalTable* localTable) = 0; - virtual void prepareRollback(LocalTable* localTable) = 0; + virtual void prepareCommit(LocalTableData* localTable) = 0; + virtual void prepareRollback(LocalTableData* localTable) = 0; virtual void checkpointInMemory() = 0; virtual void rollbackInMemory() = 0; diff --git a/src/include/storage/store/table_data.h b/src/include/storage/store/table_data.h index 8f55719441..be1d252964 100644 --- a/src/include/storage/store/table_data.h +++ b/src/include/storage/store/table_data.h @@ -12,6 +12,7 @@ struct TableReadState { std::vector columnIDs; }; +class LocalTableData; class TableData { public: virtual ~TableData() = default; @@ -43,7 +44,7 @@ class TableData { return columns[0]->getNumNodeGroups(transaction); } - virtual void prepareLocalTableToCommit(LocalTable* localTable) = 0; + virtual void prepareLocalTableToCommit(LocalTableData* localTable) = 0; virtual void checkpointInMemory(); virtual void rollbackInMemory(); diff --git a/src/include/transaction/transaction.h b/src/include/transaction/transaction.h index 4b2aa0fc7a..67f764efe3 100644 --- a/src/include/transaction/transaction.h +++ b/src/include/transaction/transaction.h @@ -2,11 +2,14 @@ #include -#include "storage/local_storage.h" +#include "storage/local_storage/local_storage.h" namespace kuzu { +namespace storage { +class LocalStorage; +class MemoryManager; +} // namespace storage namespace transaction { - class TransactionManager; enum class TransactionType : uint8_t { READ_ONLY, WRITE }; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index b1d70f3dc3..b155d4b265 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -1,6 +1,7 @@ add_subdirectory(buffer_manager) add_subdirectory(compression) add_subdirectory(index) +add_subdirectory(local_storage) add_subdirectory(stats) add_subdirectory(storage_structure) add_subdirectory(store) @@ -9,8 +10,6 @@ add_subdirectory(wal) add_library(kuzu_storage OBJECT file_handle.cpp - local_storage.cpp - local_table.cpp storage_info.cpp storage_manager.cpp storage_utils.cpp diff --git a/src/storage/local_storage.cpp b/src/storage/local_storage.cpp deleted file mode 100644 index 5f63cb6d81..0000000000 --- a/src/storage/local_storage.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include "storage/local_storage.h" - -#include "storage/store/column.h" - -using namespace kuzu::common; -using namespace kuzu::transaction; - -namespace kuzu { -namespace storage { - -LocalStorage::LocalStorage(MemoryManager* mm) : mm{mm} {} - -void LocalStorage::scan(table_id_t tableID, ValueVector* nodeIDVector, - const std::vector& columnIDs, const std::vector& outputVectors) { - if (!tables.contains(tableID)) { - return; - } - tables.at(tableID)->scan(nodeIDVector, columnIDs, outputVectors); -} - -void LocalStorage::lookup(table_id_t tableID, ValueVector* nodeIDVector, - const std::vector& columnIDs, const std::vector& outputVectors) { - if (!tables.contains(tableID)) { - return; - } - tables.at(tableID)->lookup(nodeIDVector, columnIDs, outputVectors); -} - -void LocalStorage::insert(table_id_t tableID, ValueVector* nodeIDVector, - const std::vector& propertyVectors) { - KU_ASSERT(tables.contains(tableID)); - tables.at(tableID)->insert(nodeIDVector, propertyVectors); -} - -void LocalStorage::update(table_id_t tableID, ValueVector* nodeIDVector, column_id_t columnID, - ValueVector* propertyVector) { - KU_ASSERT(tables.contains(tableID)); - tables.at(tableID)->update(nodeIDVector, columnID, propertyVector); -} - -void LocalStorage::delete_(common::table_id_t tableID, common::ValueVector* nodeIDVector) { - if (!tables.contains(tableID)) { - return; - } - tables.at(tableID)->delete_(nodeIDVector); -} - -void LocalStorage::initializeLocalTable( - table_id_t tableID, const std::vector>& columns) { - if (!tables.contains(tableID)) { - std::vector> dataTypes; - for (auto& column : columns) { - dataTypes.emplace_back(column->getDataType()->copy()); - } - tables.emplace(tableID, std::make_unique(tableID, std::move(dataTypes), mm)); - } -} - -} // namespace storage -} // namespace kuzu diff --git a/src/storage/local_storage/CMakeLists.txt b/src/storage/local_storage/CMakeLists.txt new file mode 100644 index 0000000000..5a6dcd59a7 --- /dev/null +++ b/src/storage/local_storage/CMakeLists.txt @@ -0,0 +1,8 @@ +add_library(kuzu_storage_local_storage + OBJECT + local_table.cpp + local_storage.cpp) + +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/storage/local_storage/local_storage.cpp b/src/storage/local_storage/local_storage.cpp new file mode 100644 index 0000000000..bc3779f843 --- /dev/null +++ b/src/storage/local_storage/local_storage.cpp @@ -0,0 +1,38 @@ +#include "storage/local_storage/local_storage.h" + +#include "storage/local_storage/local_table.h" +#include "storage/store/column.h" + +using namespace kuzu::common; +using namespace kuzu::transaction; + +namespace kuzu { +namespace storage { + +LocalStorage::LocalStorage(MemoryManager* mm) : mm{mm} {} + +LocalTableData* LocalStorage::getOrCreateLocalTableData( + common::table_id_t tableID, const std::vector>& columns) { + if (!tables.contains(tableID)) { + tables.emplace(tableID, std::make_unique(tableID, TableType::NODE)); + } + return tables.at(tableID)->getOrCreateLocalTableData(columns, mm); +} + +LocalTableData* LocalStorage::getLocalTableData(common::table_id_t tableID) { + if (!tables.contains(tableID)) { + return nullptr; + } + return tables.at(tableID)->getLocalTableData(); +} + +std::unordered_set LocalStorage::getTableIDsWithUpdates() { + std::unordered_set tableSetToUpdate; + for (auto& [tableID, _] : tables) { + tableSetToUpdate.insert(tableID); + } + return tableSetToUpdate; +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/local_table.cpp b/src/storage/local_storage/local_table.cpp similarity index 84% rename from src/storage/local_table.cpp rename to src/storage/local_storage/local_table.cpp index 79bc4c7dc2..f8f6ee5a05 100644 --- a/src/storage/local_table.cpp +++ b/src/storage/local_storage/local_table.cpp @@ -1,7 +1,8 @@ -#include "storage/local_table.h" +#include "storage/local_storage/local_table.h" #include "common/exception/message.h" #include "storage/storage_utils.h" +#include "storage/store/column.h" using namespace kuzu::common; @@ -87,6 +88,15 @@ void LocalVectorCollection::prepareAppend() { } } +LocalNodeGroup::LocalNodeGroup(std::vector dataTypes, MemoryManager* mm) { + columns.resize(dataTypes.size()); + for (auto i = 0u; i < dataTypes.size(); ++i) { + // To avoid unnecessary memory consumption, we chunk local changes of each column in the + // node group into chunks of size DEFAULT_VECTOR_CAPACITY. + columns[i] = std::make_unique(dataTypes[i], mm); + } +} + void LocalNodeGroup::scan(ValueVector* nodeIDVector, const std::vector& columnIDs, const std::vector& outputVectors) { KU_ASSERT(columnIDs.size() == outputVectors.size()); @@ -146,13 +156,13 @@ void LocalNodeGroup::delete_(ValueVector* nodeIDVector) { } } -void LocalTable::scan(ValueVector* nodeIDVector, const std::vector& columnIDs, +void LocalTableData::scan(ValueVector* nodeIDVector, const std::vector& columnIDs, const std::vector& outputVectors) { auto nodeGroupIdx = initializeLocalNodeGroup(nodeIDVector); nodeGroups.at(nodeGroupIdx)->scan(nodeIDVector, columnIDs, outputVectors); } -void LocalTable::lookup(ValueVector* nodeIDVector, const std::vector& columnIDs, +void LocalTableData::lookup(ValueVector* nodeIDVector, const std::vector& columnIDs, const std::vector& outputVectors) { for (auto i = 0u; i < nodeIDVector->state->selVector->selectedSize; i++) { auto nodeIDPos = nodeIDVector->state->selVector->selectedPositions[i]; @@ -168,20 +178,20 @@ void LocalTable::lookup(ValueVector* nodeIDVector, const std::vector& propertyVectors) { KU_ASSERT(nodeIDVector->state->selVector->selectedSize == 1); auto nodeGroupIdx = initializeLocalNodeGroup(nodeIDVector); nodeGroups.at(nodeGroupIdx)->insert(nodeIDVector, propertyVectors); } -void LocalTable::update( +void LocalTableData::update( ValueVector* nodeIDVector, column_id_t columnID, ValueVector* propertyVector) { auto nodeGroupIdx = initializeLocalNodeGroup(nodeIDVector); nodeGroups.at(nodeGroupIdx)->update(nodeIDVector, columnID, propertyVector); } -void LocalTable::delete_(ValueVector* nodeIDVector) { +void LocalTableData::delete_(ValueVector* nodeIDVector) { auto nodeIDPos = nodeIDVector->state->selVector->selectedPositions[0]; auto nodeOffset = nodeIDVector->getValue(nodeIDPos).offset; auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); @@ -191,13 +201,13 @@ void LocalTable::delete_(ValueVector* nodeIDVector) { nodeGroups.at(nodeGroupIdx)->delete_(nodeIDVector); } -node_group_idx_t LocalTable::initializeLocalNodeGroup(ValueVector* nodeIDVector) { +node_group_idx_t LocalTableData::initializeLocalNodeGroup(ValueVector* nodeIDVector) { auto nodeIDPos = nodeIDVector->state->selVector->selectedPositions[0]; auto nodeOffset = nodeIDVector->getValue(nodeIDPos).offset; return initializeLocalNodeGroup(nodeOffset); } -node_group_idx_t LocalTable::initializeLocalNodeGroup(offset_t nodeOffset) { +node_group_idx_t LocalTableData::initializeLocalNodeGroup(offset_t nodeOffset) { auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); if (!nodeGroups.contains(nodeGroupIdx)) { nodeGroups.emplace(nodeGroupIdx, std::make_unique(dataTypes, mm)); @@ -205,5 +215,17 @@ node_group_idx_t LocalTable::initializeLocalNodeGroup(offset_t nodeOffset) { return nodeGroupIdx; } +LocalTableData* LocalTable::getOrCreateLocalTableData( + const std::vector>& columns, MemoryManager* mm) { + if (!localTableData) { + std::vector dataTypes; + for (auto& column : columns) { + dataTypes.push_back(column->getDataType()); + } + localTableData = std::make_unique(std::move(dataTypes), mm); + } + return localTableData.get(); +} + } // namespace storage } // namespace kuzu diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp index 98e51efcb2..6bca678129 100644 --- a/src/storage/storage_manager.cpp +++ b/src/storage/storage_manager.cpp @@ -93,7 +93,7 @@ void StorageManager::prepareCommit(transaction::Transaction* transaction) { auto localStorage = transaction->getLocalStorage(); for (auto tableID : localStorage->getTableIDsWithUpdates()) { KU_ASSERT(tables.contains(tableID)); - tables.at(tableID)->prepareCommit(localStorage->getLocalTable(tableID)); + tables.at(tableID)->prepareCommit(localStorage->getLocalTableData(tableID)); } if (nodesStatisticsAndDeletedIDs->hasUpdates()) { wal->logTableStatisticsRecord(true /* isNodeTable */); @@ -115,7 +115,7 @@ void StorageManager::prepareRollback(transaction::Transaction* transaction) { auto localStorage = transaction->getLocalStorage(); for (auto tableID : localStorage->getTableIDsWithUpdates()) { KU_ASSERT(tables.contains(tableID)); - tables.at(tableID)->prepareRollback(localStorage->getLocalTable(tableID)); + tables.at(tableID)->prepareRollback(localStorage->getLocalTableData(tableID)); } } diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index 43b4c9336a..10fa2bba4c 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -108,7 +108,7 @@ void NodeTable::addColumn(transaction::Transaction* transaction, const catalog:: wal->addToUpdatedTables(tableID); } -void NodeTable::prepareCommit(LocalTable* localTable) { +void NodeTable::prepareCommit(LocalTableData* localTable) { if (pkIndex) { pkIndex->prepareCommit(); } @@ -116,7 +116,7 @@ void NodeTable::prepareCommit(LocalTable* localTable) { wal->addToUpdatedTables(tableID); } -void NodeTable::prepareRollback(LocalTable* localTable) { +void NodeTable::prepareRollback(LocalTableData* localTable) { if (pkIndex) { pkIndex->prepareRollback(); } diff --git a/src/storage/store/node_table_data.cpp b/src/storage/store/node_table_data.cpp index de226c75b4..95e89d1ca7 100644 --- a/src/storage/store/node_table_data.cpp +++ b/src/storage/store/node_table_data.cpp @@ -1,5 +1,6 @@ #include "storage/store/node_table_data.h" +#include "storage/local_storage/local_table.h" #include "storage/stats/nodes_store_statistics.h" using namespace kuzu::catalog; @@ -38,8 +39,10 @@ void NodeTableData::scan(Transaction* transaction, TableReadState& readState, } } if (transaction->isWriteTransaction()) { - transaction->getLocalStorage()->scan( - tableID, nodeIDVector, readState.columnIDs, outputVectors); + auto localTableData = transaction->getLocalStorage()->getLocalTableData(tableID); + if (localTableData) { + localTableData->scan(nodeIDVector, readState.columnIDs, outputVectors); + } } } @@ -55,23 +58,23 @@ void NodeTableData::insert(Transaction* transaction, ValueVector* nodeIDVector, newNodeGroup->finalize(currentNumNodeGroups); append(newNodeGroup.get()); } - auto localStorage = transaction->getLocalStorage(); - localStorage->initializeLocalTable(tableID, columns); - localStorage->insert(tableID, nodeIDVector, propertyVectors); + auto localTableData = + transaction->getLocalStorage()->getOrCreateLocalTableData(tableID, columns); + localTableData->insert(nodeIDVector, propertyVectors); } void NodeTableData::update(Transaction* transaction, column_id_t columnID, ValueVector* nodeIDVector, ValueVector* propertyVector) { KU_ASSERT(columnID < columns.size()); - auto localStorage = transaction->getLocalStorage(); - localStorage->initializeLocalTable(tableID, columns); - localStorage->update(tableID, nodeIDVector, columnID, propertyVector); + auto localTableData = + transaction->getLocalStorage()->getOrCreateLocalTableData(tableID, columns); + localTableData->update(nodeIDVector, columnID, propertyVector); } void NodeTableData::delete_(Transaction* transaction, ValueVector* nodeIDVector) { - auto localStorage = transaction->getLocalStorage(); - localStorage->initializeLocalTable(tableID, columns); - localStorage->delete_(tableID, nodeIDVector); + auto localTableData = + transaction->getLocalStorage()->getOrCreateLocalTableData(tableID, columns); + localTableData->delete_(nodeIDVector); } void NodeTableData::lookup(Transaction* transaction, TableReadState& readState, @@ -87,8 +90,10 @@ void NodeTableData::lookup(Transaction* transaction, TableReadState& readState, } } if (transaction->isWriteTransaction()) { - transaction->getLocalStorage()->lookup( - tableID, nodeIDVector, readState.columnIDs, outputVectors); + auto localTableData = transaction->getLocalStorage()->getLocalTableData(tableID); + if (localTableData) { + localTableData->lookup(nodeIDVector, readState.columnIDs, outputVectors); + } } } @@ -100,7 +105,7 @@ void NodeTableData::append(kuzu::storage::NodeGroup* nodeGroup) { } } -void NodeTableData::prepareLocalTableToCommit(LocalTable* localTable) { +void NodeTableData::prepareLocalTableToCommit(LocalTableData* localTable) { auto numNodeGroups = getNumNodeGroups(&DUMMY_WRITE_TRANSACTION); for (auto& [nodeGroupIdx, nodeGroup] : localTable->nodeGroups) { for (auto columnID = 0; columnID < columns.size(); columnID++) { diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index ea120aae12..d9ae88d0a8 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -61,11 +61,11 @@ void RelTable::addColumn( wal->addToUpdatedTables(tableID); } -void RelTable::prepareCommit(LocalTable* /*localTable*/) { +void RelTable::prepareCommit(LocalTableData* /*localTable*/) { wal->addToUpdatedTables(tableID); } -void RelTable::prepareRollback(LocalTable* localTable) { +void RelTable::prepareRollback(LocalTableData* localTable) { // DO NOTHING } diff --git a/src/storage/store/rel_table_data.cpp b/src/storage/store/rel_table_data.cpp index 85ccbeddac..fa2753d5a2 100644 --- a/src/storage/store/rel_table_data.cpp +++ b/src/storage/store/rel_table_data.cpp @@ -45,28 +45,27 @@ RelTableData::RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, csrOffsetColumn{nullptr} { if (dataFormat == ColumnDataFormat::CSR) { auto csrOffsetMetadataDAHInfo = relsStoreStats->getCSROffsetMetadataDAHInfo( - Transaction::getDummyWriteTrx().get(), tableID, direction); + &DUMMY_WRITE_TRANSACTION, tableID, direction); // No NULL values is allowed for the csr offset column. csrOffsetColumn = std::make_unique(LogicalType::INT64(), *csrOffsetMetadataDAHInfo, - dataFH, metadataFH, bufferManager, wal, Transaction::getDummyReadOnlyTrx().get(), + dataFH, metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, RWPropertyStats::empty(), enableCompression, false /* requireNUllColumn */); } - auto adjMetadataDAHInfo = relsStoreStats->getAdjMetadataDAHInfo( - Transaction::getDummyWriteTrx().get(), tableID, direction); + auto adjMetadataDAHInfo = + relsStoreStats->getAdjMetadataDAHInfo(&DUMMY_WRITE_TRANSACTION, tableID, direction); adjColumn = ColumnFactory::createColumn(LogicalType::INTERNAL_ID(), *adjMetadataDAHInfo, dataFH, - metadataFH, bufferManager, wal, Transaction::getDummyReadOnlyTrx().get(), - RWPropertyStats::empty(), enableCompression); + metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, RWPropertyStats::empty(), + enableCompression); auto properties = tableSchema->getProperties(); columns.reserve(properties.size()); for (auto i = 0u; i < properties.size(); i++) { auto property = tableSchema->getProperties()[i]; auto metadataDAHInfo = relsStoreStats->getPropertyMetadataDAHInfo( - Transaction::getDummyWriteTrx().get(), tableID, i, direction); - columns.push_back( - ColumnFactory::createColumn(properties[i]->getDataType()->copy(), *metadataDAHInfo, - dataFH, metadataFH, bufferManager, wal, Transaction::getDummyReadOnlyTrx().get(), - RWPropertyStats(relsStoreStats, tableID, property->getPropertyID()), - enableCompression)); + &DUMMY_WRITE_TRANSACTION, tableID, i, direction); + columns.push_back(ColumnFactory::createColumn(properties[i]->getDataType()->copy(), + *metadataDAHInfo, dataFH, metadataFH, bufferManager, wal, &DUMMY_READ_TRANSACTION, + RWPropertyStats(relsStoreStats, tableID, property->getPropertyID()), + enableCompression)); } // Set common tableID for adjColumn and relIDColumn. dynamic_cast(adjColumn.get()) @@ -170,7 +169,7 @@ void RelTableData::append(NodeGroup* nodeGroup) { } } -void RelTableData::prepareLocalTableToCommit(LocalTable* /*localTable*/) { +void RelTableData::prepareLocalTableToCommit(LocalTableData* /*localTable*/) { KU_UNREACHABLE; }