diff --git a/src/include/processor/operator/copy/copy_node.h b/src/include/processor/operator/copy/copy_node.h index 143cca9955b..f8e9f7596f2 100644 --- a/src/include/processor/operator/copy/copy_node.h +++ b/src/include/processor/operator/copy/copy_node.h @@ -1,7 +1,6 @@ #pragma once #include "processor/operator/sink.h" -#include "storage/copier/node_copy_executor.h" #include "storage/in_mem_storage_structure/in_mem_column.h" #include "storage/store/node_table.h" @@ -31,6 +30,7 @@ class CopyNodeSharedState { uint64_t& numRows; std::mutex mtx; std::shared_ptr table; + bool hasLoggedWAL; }; struct CopyNodeLocalState { diff --git a/src/include/processor/operator/scan/scan_rel_table_columns.h b/src/include/processor/operator/scan/scan_rel_table_columns.h index 3c04f1dff1a..22684faedd4 100644 --- a/src/include/processor/operator/scan/scan_rel_table_columns.h +++ b/src/include/processor/operator/scan/scan_rel_table_columns.h @@ -2,27 +2,30 @@ #include "processor/operator/filtering_operator.h" #include "processor/operator/scan/scan_rel_table.h" +#include "storage/store/rels_statistics.h" namespace kuzu { namespace processor { class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter { public: - ScanRelTableColumns(storage::DirectedRelTableData* tableData, std::vector propertyIds, - const DataPos& inNodeIDVectorPos, std::vector outputVectorsPos, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + ScanRelTableColumns(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats, + std::vector propertyIds, const DataPos& inNodeIDVectorPos, + std::vector outputVectorsPos, std::unique_ptr child, uint32_t id, + const std::string& paramsString) : ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos), PhysicalOperatorType::SCAN_REL_TABLE_COLUMNS, std::move(child), id, paramsString}, tableData{tableData} { scanState = std::make_unique( - std::move(propertyIds), storage::RelTableDataType::COLUMNS); + relStats, std::move(propertyIds), storage::RelTableDataType::COLUMNS); } bool getNextTuplesInternal(ExecutionContext* context) override; inline std::unique_ptr clone() override { - return std::make_unique(tableData, scanState->propertyIds, - inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, paramsString); + return std::make_unique(tableData, scanState->relStats, + scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, + paramsString); } private: diff --git a/src/include/processor/operator/scan/scan_rel_table_lists.h b/src/include/processor/operator/scan/scan_rel_table_lists.h index 19d51258323..f92b21eae84 100644 --- a/src/include/processor/operator/scan/scan_rel_table_lists.h +++ b/src/include/processor/operator/scan/scan_rel_table_lists.h @@ -8,21 +8,23 @@ namespace processor { class ScanRelTableLists : public ScanRelTable { public: - ScanRelTableLists(storage::DirectedRelTableData* tableData, std::vector propertyIds, - const DataPos& inNodeIDVectorPos, std::vector outputVectorsPos, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + ScanRelTableLists(storage::DirectedRelTableData* tableData, storage::RelStatistics* relStats, + std::vector propertyIds, const DataPos& inNodeIDVectorPos, + std::vector outputVectorsPos, std::unique_ptr child, uint32_t id, + const std::string& paramsString) : ScanRelTable{inNodeIDVectorPos, std::move(outputVectorsPos), PhysicalOperatorType::SCAN_REL_TABLE_LISTS, std::move(child), id, paramsString}, tableData{tableData} { scanState = std::make_unique( - std::move(propertyIds), storage::RelTableDataType::LISTS); + relStats, std::move(propertyIds), storage::RelTableDataType::LISTS); } bool getNextTuplesInternal(ExecutionContext* context) override; inline std::unique_ptr clone() override { - return make_unique(tableData, scanState->propertyIds, inNodeIDVectorPos, - outputVectorsPos, children[0]->clone(), id, paramsString); + return make_unique(tableData, scanState->relStats, + scanState->propertyIds, inNodeIDVectorPos, outputVectorsPos, children[0]->clone(), id, + paramsString); } private: diff --git a/src/include/storage/copier/rel_copy_executor.h b/src/include/storage/copier/rel_copy_executor.h index 00fe2e9d85b..1017885c98c 100644 --- a/src/include/storage/copier/rel_copy_executor.h +++ b/src/include/storage/copier/rel_copy_executor.h @@ -40,7 +40,7 @@ class DirectedInMemRelData { class RelCopyExecutor { public: - RelCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory, + RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal, common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics); @@ -58,6 +58,7 @@ class RelCopyExecutor { private: common::CopyDescription& copyDescription; + WAL* wal; std::string outputDirectory; std::unordered_map fileBlockInfos; common::TaskScheduler& taskScheduler; diff --git a/src/include/storage/storage_structure/lists/lists.h b/src/include/storage/storage_structure/lists/lists.h index 40be17cea55..98619230580 100644 --- a/src/include/storage/storage_structure/lists/lists.h +++ b/src/include/storage/storage_structure/lists/lists.h @@ -186,27 +186,27 @@ class AdjLists : public Lists { bufferManager, false /* hasNullBytes */, wal, listsUpdatesStore}, nbrTableID{nbrTableID} {}; - inline bool mayContainNulls() const override { return false; } + inline bool mayContainNulls() const final { return false; } void readValues(transaction::Transaction* transaction, common::ValueVector* valueVector, - ListHandle& listHandle) override; + ListHandle& listHandle) final; // Currently, used only in copyCSV tests. std::unique_ptr> readAdjacencyListOfNode( common::offset_t nodeOffset); - void checkpointInMemoryIfNecessary() override { + inline void checkpointInMemoryIfNecessary() final { headers->checkpointInMemoryIfNecessary(); Lists::checkpointInMemoryIfNecessary(); } - void rollbackInMemoryIfNecessary() override { + inline void rollbackInMemoryIfNecessary() final { headers->rollbackInMemoryIfNecessary(); Lists::rollbackInMemoryIfNecessary(); } private: - void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) override; + void readFromList(common::ValueVector* valueVector, ListHandle& listHandle) final; void readFromListsUpdatesStore(ListHandle& listHandle, common::ValueVector* valueVector); void readFromPersistentStore(ListHandle& listHandle, common::ValueVector* valueVector); diff --git a/src/include/storage/storage_structure/storage_structure.h b/src/include/storage/storage_structure/storage_structure.h index 4afac65d766..b55abcf0d7b 100644 --- a/src/include/storage/storage_structure/storage_structure.h +++ b/src/include/storage/storage_structure/storage_structure.h @@ -106,8 +106,6 @@ class BaseColumnOrList : public StorageStructure { uint16_t pagePosOfFirstElement, uint64_t numValuesToRead, common::table_id_t commonTableID, bool hasNoNullGuarantee); - void setNullBitOfAPosInFrame(const uint8_t* frame, uint16_t elementPos, bool isNull) const; - void readNullBitsFromAPage(common::ValueVector* valueVector, const uint8_t* frame, uint64_t posInPage, uint64_t posInVector, uint64_t numBitsToRead) const; diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index c30fdc127ab..9ebdfaa0b80 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -16,8 +16,8 @@ class NodeTable { BufferManager& bufferManager, WAL* wal, catalog::NodeTableSchema* nodeTableSchema); void initializeData(catalog::NodeTableSchema* nodeTableSchema); - - void resetColumns(catalog::NodeTableSchema* nodeTableSchema); + static std::unordered_map> initializeColumns( + WAL* wal, BufferManager* bm, catalog::NodeTableSchema* nodeTableSchema); inline common::offset_t getMaxNodeOffset(transaction::Transaction* trx) const { return nodesStatisticsAndDeletedIDs->getMaxNodeOffset(trx, tableID); diff --git a/src/include/storage/store/rel_table.h b/src/include/storage/store/rel_table.h index 827d065898b..c967b8251fb 100644 --- a/src/include/storage/store/rel_table.h +++ b/src/include/storage/store/rel_table.h @@ -5,6 +5,7 @@ #include "storage/storage_structure/column.h" #include "storage/storage_structure/lists/lists.h" #include "storage/storage_utils.h" +#include "storage/store/rels_statistics.h" namespace kuzu { namespace storage { @@ -40,9 +41,10 @@ class ListsUpdateIteratorsForDirection { struct RelTableScanState { public: - RelTableScanState( + RelTableScanState(storage::RelStatistics* relStats, std::vector propertyIds, RelTableDataType relTableDataType) - : relTableDataType{relTableDataType}, propertyIds{std::move(propertyIds)} { + : relStats{relStats}, relTableDataType{relTableDataType}, propertyIds{ + std::move(propertyIds)} { if (relTableDataType == RelTableDataType::LISTS) { syncState = std::make_unique(); // The first listHandle is for adj lists. @@ -58,6 +60,7 @@ struct RelTableScanState { syncState->hasMoreAndSwitchSourceIfNecessary(); } + RelStatistics* relStats; RelTableDataType relTableDataType; std::vector propertyIds; // sync state between adj and property lists @@ -98,6 +101,9 @@ class DirectedRelTableData { inline void scan(transaction::Transaction* transaction, RelTableScanState& scanState, common::ValueVector* inNodeIDVector, const std::vector& outputVectors) { + if (scanState.relStats->getNumTuples() == 0) { + return; + } if (scanState.relTableDataType == RelTableDataType::COLUMNS) { scanColumns(transaction, scanState, inNodeIDVector, outputVectors); } else { diff --git a/src/include/storage/wal_replayer_utils.h b/src/include/storage/wal_replayer_utils.h index f6c20a1d6f9..e334c070968 100644 --- a/src/include/storage/wal_replayer_utils.h +++ b/src/include/storage/wal_replayer_utils.h @@ -12,20 +12,6 @@ namespace storage { class WALReplayerUtils { public: - static inline void replaceNodeFilesWithVersionFromWALIfExists( - catalog::NodeTableSchema* nodeTableSchema, const std::string& directory) { - fileOperationOnNodeFiles(nodeTableSchema, directory, - replaceOriginalColumnFilesWithWALVersionIfExists, - replaceOriginalListFilesWithWALVersionIfExists); - } - - static inline void replaceRelPropertyFilesWithVersionFromWALIfExists( - catalog::RelTableSchema* relTableSchema, const std::string& directory) { - fileOperationOnRelFiles(relTableSchema, directory, - replaceOriginalColumnFilesWithWALVersionIfExists, - replaceOriginalListFilesWithWALVersionIfExists); - } - static inline void removeDBFilesForNodeTable( catalog::NodeTableSchema* tableSchema, const std::string& directory) { fileOperationOnNodeFiles( @@ -63,10 +49,6 @@ class WALReplayerUtils { static void renameDBFilesForRelProperty(const std::string& directory, catalog::RelTableSchema* relTableSchema, common::property_id_t propertyID); - static void replaceListsHeadersFilesWithVersionFromWALIfExists( - const std::unordered_set& relTableSchemas, - common::table_id_t boundTableID, const std::string& directory); - private: static inline void removeColumnFilesForPropertyIfExists(const std::string& directory, common::table_id_t relTableID, common::table_id_t boundTableID, diff --git a/src/main/database.cpp b/src/main/database.cpp index cb47f583a39..faf5ea11256 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -176,15 +176,9 @@ void Database::rollbackAndClearWAL() { void Database::recoverIfNecessary() { if (!wal->isEmptyWAL()) { - if (wal->isLastLoggedRecordCommit()) { - logger->info("Starting up StorageManager and found a non-empty WAL with a committed " - "transaction. Replaying to checkpointInMemory."); - checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT); - } else { - logger->info("Starting up StorageManager and found a non-empty WAL but last record is " - "not commit. Clearing the WAL."); - wal->clearWAL(); - } + logger->info("Starting up StorageManager and found a non-empty WAL with a committed " + "transaction. Replaying to checkpointInMemory."); + checkpointAndClearWAL(WALReplayMode::RECOVERY_CHECKPOINT); } } diff --git a/src/processor/mapper/map_extend.cpp b/src/processor/mapper/map_extend.cpp index aba6bd42eaa..d9d39f6c583 100644 --- a/src/processor/mapper/map_extend.cpp +++ b/src/processor/mapper/map_extend.cpp @@ -26,7 +26,7 @@ static std::vector populatePropertyIds( static std::pair> getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema* relTableSchema, - table_id_t boundNodeTableID, const RelsStore& relsStore, table_id_t relTableID, + table_id_t boundNodeTableID, RelsStore& relsStore, table_id_t relTableID, const expression_vector& properties) { if (relTableSchema->getBoundTableID(direction) != boundNodeTableID) { // No data stored for given direction and boundNode. @@ -40,17 +40,17 @@ getRelTableDataAndScanState(RelDataDirection direction, catalog::RelTableSchema* propertyExpression->getPropertyID(relTableID) : INVALID_PROPERTY_ID); } - auto scanState = make_unique( - std::move(propertyIds), relsStore.isSingleMultiplicityInDirection(direction, relTableID) ? - RelTableDataType::COLUMNS : - RelTableDataType::LISTS); + auto relStats = relsStore.getRelsStatistics().getRelStatistics(relTableID); + auto scanState = make_unique(relStats, std::move(propertyIds), + relsStore.isSingleMultiplicityInDirection(direction, relTableID) ? + RelTableDataType::COLUMNS : + RelTableDataType::LISTS); return std::make_pair(relData, std::move(scanState)); } static std::unique_ptr populateRelTableDataCollection( table_id_t boundNodeTableID, const RelExpression& rel, ExtendDirection extendDirection, - const expression_vector& properties, const RelsStore& relsStore, - const catalog::Catalog& catalog) { + const expression_vector& properties, RelsStore& relsStore, const catalog::Catalog& catalog) { std::vector relTableDatas; std::vector> tableScanStates; for (auto relTableID : rel.getTableIDs()) { @@ -120,19 +120,22 @@ std::unique_ptr PlanMapper::mapLogicalExtendToPhysical( extendDirection != planner::ExtendDirection::BOTH) { auto relDataDirection = ExtendDirectionUtils::getRelDataDirection(extendDirection); auto relTableID = rel->getSingleTableID(); + auto relTableStats = relsStore.getRelsStatistics().getRelStatistics(relTableID); if (relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID)) { auto propertyIds = populatePropertyIds(relTableID, extend->getProperties()); return make_unique( relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection), - std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos), - std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting()); + relTableStats, std::move(propertyIds), inNodeIDVectorPos, + std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(), + extend->getExpressionsForPrinting()); } else { assert(!relsStore.isSingleMultiplicityInDirection(relDataDirection, relTableID)); auto propertyIds = populatePropertyIds(relTableID, extend->getProperties()); return make_unique( relsStore.getRelTable(relTableID)->getDirectedTableData(relDataDirection), - std::move(propertyIds), inNodeIDVectorPos, std::move(outputVectorsPos), - std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting()); + relTableStats, std::move(propertyIds), inNodeIDVectorPos, + std::move(outputVectorsPos), std::move(prevOperator), getOperatorID(), + extend->getExpressionsForPrinting()); } } else { // map to generic extend std::unordered_map> diff --git a/src/processor/operator/copy/copy_node.cpp b/src/processor/operator/copy/copy_node.cpp index 694e1b165f9..fe23b1cbc0c 100644 --- a/src/processor/operator/copy/copy_node.cpp +++ b/src/processor/operator/copy/copy_node.cpp @@ -6,7 +6,7 @@ namespace kuzu { namespace processor { CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, storage::MemoryManager* memoryManager) - : numRows{numRows}, pkColumnID{0} { + : numRows{numRows}, pkColumnID{0}, hasLoggedWAL{false} { auto ftTableSchema = std::make_unique(); ftTableSchema->appendColumn( std::make_unique(false /* flat */, 0 /* dataChunkPos */, @@ -20,7 +20,7 @@ void CopyNodeSharedState::initializePrimaryKey( common::LogicalTypeID::SERIAL) { pkIndex = std::make_unique( storage::StorageUtils::getNodeIndexFName( - directory, nodeTableSchema->tableID, common::DBFileType::WAL_VERSION), + directory, nodeTableSchema->tableID, common::DBFileType::ORIGINAL), nodeTableSchema->getPrimaryKey().dataType); pkIndex->bulkReserve(numRows); } @@ -40,13 +40,22 @@ void CopyNodeSharedState::initializeColumns( // Skip SERIAL, as it is not physically stored. continue; } - auto fPath = storage::StorageUtils::getNodePropertyColumnFName(directory, - nodeTableSchema->tableID, property.propertyID, common::DBFileType::WAL_VERSION); + auto fPath = storage::StorageUtils::getNodePropertyColumnFName( + directory, nodeTableSchema->tableID, property.propertyID, common::DBFileType::ORIGINAL); columns.push_back(std::make_unique(fPath, property.dataType)); } } void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) { + { + std::unique_lock xLck{sharedState->mtx}; + if (!sharedState->hasLoggedWAL) { + localState->wal->logCopyNodeRecord(localState->table->getTableID()); + localState->wal->flushAllPages(); + sharedState->hasLoggedWAL = true; + } + } + if (sharedState->hasLoggedWAL) {} while (children[0]->getNextTuple(context)) { std::vector> columnChunks; columnChunks.reserve(sharedState->columns.size()); @@ -69,13 +78,13 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) { } void CopyNode::finalize(kuzu::processor::ExecutionContext* context) { + auto tableID = localState->table->getTableID(); if (sharedState->pkIndex) { sharedState->pkIndex->flush(); } for (auto& column : sharedState->columns) { column->saveToFile(); } - auto tableID = localState->table->getTableID(); for (auto& relTableSchema : localState->catalog->getAllRelTableSchemasContainBoundTable(tableID)) { localState->relsStore->getRelTable(relTableSchema->tableID) @@ -83,7 +92,6 @@ void CopyNode::finalize(kuzu::processor::ExecutionContext* context) { } localState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable( tableID, sharedState->numRows); - localState->wal->logCopyNodeRecord(tableID); auto outputMsg = common::StringUtils::string_format( "{} number of tuples has been copied to table: {}.", sharedState->numRows, localState->catalog->getReadOnlyVersion()->getTableName(tableID).c_str()); diff --git a/src/processor/operator/copy/copy_rel.cpp b/src/processor/operator/copy/copy_rel.cpp index 094345ba416..6cf160b57d8 100644 --- a/src/processor/operator/copy/copy_rel.cpp +++ b/src/processor/operator/copy/copy_rel.cpp @@ -9,10 +9,9 @@ namespace processor { uint64_t CopyRel::executeInternal( kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) { - auto relCopier = make_unique(copyDescription, wal->getDirectory(), - *taskScheduler, *catalog, nodesStore, table, relsStatistics); + auto relCopier = std::make_unique( + copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics); auto numRelsCopied = relCopier->copy(executionContext); - wal->logCopyRelRecord(tableID); return numRelsCopied; } diff --git a/src/processor/operator/scan/generic_scan_rel_tables.cpp b/src/processor/operator/scan/generic_scan_rel_tables.cpp index e6e8c32d5c9..e05f55d7cad 100644 --- a/src/processor/operator/scan/generic_scan_rel_tables.cpp +++ b/src/processor/operator/scan/generic_scan_rel_tables.cpp @@ -44,8 +44,8 @@ bool RelTableDataCollection::scan(ValueVector* inVector, std::unique_ptr RelTableDataCollection::clone() const { std::vector> clonedScanStates; for (auto& scanState : tableScanStates) { - clonedScanStates.push_back( - make_unique(scanState->propertyIds, scanState->relTableDataType)); + clonedScanStates.push_back(make_unique( + scanState->relStats, scanState->propertyIds, scanState->relTableDataType)); } return make_unique(relTableDatas, std::move(clonedScanStates)); } diff --git a/src/storage/copier/CMakeLists.txt b/src/storage/copier/CMakeLists.txt index 35a024b2095..5b35af93dec 100644 --- a/src/storage/copier/CMakeLists.txt +++ b/src/storage/copier/CMakeLists.txt @@ -1,7 +1,7 @@ add_library(kuzu_storage_in_mem_csv_copier OBJECT - node_copy_executor.cpp node_copier.cpp + node_copy_executor.cpp npy_reader.cpp rel_copier.cpp rel_copy_executor.cpp diff --git a/src/storage/copier/node_copier.cpp b/src/storage/copier/node_copier.cpp index 4f76fe7d2a8..99e3e3be9f3 100644 --- a/src/storage/copier/node_copier.cpp +++ b/src/storage/copier/node_copier.cpp @@ -33,7 +33,7 @@ NodeCopier::NodeCopier(const std::string& directory, std::shared_ptrtableID, property.propertyID, DBFileType::WAL_VERSION); + directory, schema->tableID, property.propertyID, DBFileType::ORIGINAL); columns.push_back(std::make_shared(fPath, property.dataType)); } // Each property corresponds to a column. @@ -50,7 +50,7 @@ void NodeCopier::initializeIndex( return; } pkIndex = std::make_unique( - StorageUtils::getNodeIndexFName(directory, schema->tableID, DBFileType::WAL_VERSION), + StorageUtils::getNodeIndexFName(directory, schema->tableID, DBFileType::ORIGINAL), primaryKey.dataType); pkIndex->bulkReserve(numTuples); pkColumnID = getPKColumnID(schema->properties, primaryKey.propertyID); diff --git a/src/storage/copier/rel_copy_executor.cpp b/src/storage/copier/rel_copy_executor.cpp index 63e1d17512f..5378a592e6e 100644 --- a/src/storage/copier/rel_copy_executor.cpp +++ b/src/storage/copier/rel_copy_executor.cpp @@ -8,10 +8,10 @@ using namespace kuzu::catalog; namespace kuzu { namespace storage { -RelCopyExecutor::RelCopyExecutor(CopyDescription& copyDescription, std::string outputDirectory, +RelCopyExecutor::RelCopyExecutor(CopyDescription& copyDescription, WAL* wal, TaskScheduler& taskScheduler, Catalog& catalog, NodesStore& nodesStore, RelTable* table, RelsStatistics* relsStatistics) - : copyDescription{copyDescription}, outputDirectory{std::move(outputDirectory)}, + : copyDescription{copyDescription}, wal{wal}, outputDirectory{std::move(wal->getDirectory())}, taskScheduler{taskScheduler}, catalog{catalog}, tableSchema{catalog.getReadOnlyVersion()->getRelTableSchema(table->getRelTableID())}, numTuples{0}, nodesStore{nodesStore}, table{table}, relsStatistics{relsStatistics} { @@ -36,15 +36,15 @@ std::unique_ptr RelCopyExecutor::initializeDirectedInMemRe auto relColumns = std::make_unique(); relColumns->adjColumn = std::make_unique( StorageUtils::getAdjColumnFName( - outputDirectory, tableSchema->tableID, direction, DBFileType::WAL_VERSION), + outputDirectory, tableSchema->tableID, direction, DBFileType::ORIGINAL), LogicalType(LogicalTypeID::INTERNAL_ID)); relColumns->adjColumnChunk = relColumns->adjColumn->getInMemColumnChunk(0, numNodes - 1, ©Description); for (auto i = 0u; i < tableSchema->getNumProperties(); ++i) { auto propertyID = tableSchema->properties[i].propertyID; auto propertyDataType = tableSchema->properties[i].dataType; - auto fName = StorageUtils::getRelPropertyColumnFName(outputDirectory, - tableSchema->tableID, direction, propertyID, DBFileType::WAL_VERSION); + auto fName = StorageUtils::getRelPropertyColumnFName( + outputDirectory, tableSchema->tableID, direction, propertyID, DBFileType::ORIGINAL); relColumns->propertyColumns.emplace( propertyID, std::make_unique(fName, propertyDataType)); relColumns->propertyColumnChunks.emplace( @@ -57,14 +57,14 @@ std::unique_ptr RelCopyExecutor::initializeDirectedInMemRe auto relLists = std::make_unique(); relLists->adjList = std::make_unique( StorageUtils::getAdjListsFName( - outputDirectory, tableSchema->tableID, direction, DBFileType::WAL_VERSION), + outputDirectory, tableSchema->tableID, direction, DBFileType::ORIGINAL), numNodes); relLists->relListsSizes = std::make_unique(numNodes); for (auto i = 0u; i < tableSchema->getNumProperties(); ++i) { auto propertyID = tableSchema->properties[i].propertyID; auto propertyDataType = tableSchema->properties[i].dataType; - auto fName = StorageUtils::getRelPropertyListsFName(outputDirectory, - tableSchema->tableID, direction, propertyID, DBFileType::WAL_VERSION); + auto fName = StorageUtils::getRelPropertyListsFName( + outputDirectory, tableSchema->tableID, direction, propertyID, DBFileType::ORIGINAL); relLists->propertyLists.emplace(propertyID, InMemListsFactory::getInMemPropertyLists(fName, propertyDataType, numNodes, ©Description, relLists->adjList->getListHeadersBuilder())); @@ -75,6 +75,9 @@ std::unique_ptr RelCopyExecutor::initializeDirectedInMemRe } offset_t RelCopyExecutor::copy(processor::ExecutionContext* executionContext) { + wal->logCopyRelRecord(table->getRelTableID()); + // We assume that COPY is a single-statement transaction, thus COPY rel is the only wal record. + wal->flushAllPages(); countRelListsSizeAndPopulateColumns(executionContext); if (!tableSchema->isSingleMultiplicityInDirection(FWD) || !tableSchema->isSingleMultiplicityInDirection(BWD)) { diff --git a/src/storage/storage_structure/storage_structure.cpp b/src/storage/storage_structure/storage_structure.cpp index 67b5054ee54..f47b4cda9c2 100644 --- a/src/storage/storage_structure/storage_structure.cpp +++ b/src/storage/storage_structure/storage_structure.cpp @@ -125,18 +125,5 @@ void BaseColumnOrList::readAPageBySequentialCopy(Transaction* transaction, Value }); } -void BaseColumnOrList::setNullBitOfAPosInFrame( - const uint8_t* frame, uint16_t elementPosInPage, bool isNull) const { - auto nullMask = (uint64_t*)getNullBufferInPage(frame); - auto nullEntryPos = elementPosInPage >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; - auto bitOffsetInEntry = - elementPosInPage - (nullEntryPos << NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); - if (isNull) { - nullMask[nullEntryPos] |= NULL_BITMASKS_WITH_SINGLE_ONE[bitOffsetInEntry]; - } else { - nullMask[nullEntryPos] &= NULL_BITMASKS_WITH_SINGLE_ZERO[bitOffsetInEntry]; - } -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/storage_utils.cpp b/src/storage/storage_utils.cpp index 35a7bc47126..add775bb61b 100644 --- a/src/storage/storage_utils.cpp +++ b/src/storage/storage_utils.cpp @@ -310,7 +310,7 @@ void StorageUtils::initializeListsHeaders(const RelTableSchema* relTableSchema, uint64_t numNodesInTable, const std::string& directory, RelDataDirection relDirection) { auto listHeadersBuilder = make_unique( StorageUtils::getAdjListsFName( - directory, relTableSchema->tableID, relDirection, DBFileType::WAL_VERSION), + directory, relTableSchema->tableID, relDirection, DBFileType::ORIGINAL), numNodesInTable); listHeadersBuilder->saveToDisk(); } diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index d90d1c4de20..d7103f8d659 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -13,12 +13,19 @@ NodeTable::NodeTable(NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs, initializeData(nodeTableSchema); } -void NodeTable::initializeData(NodeTableSchema* nodeTableSchema) { +std::unordered_map> NodeTable::initializeColumns( + WAL* wal, kuzu::storage::BufferManager* bm, NodeTableSchema* nodeTableSchema) { + std::unordered_map> propertyColumns; for (auto& property : nodeTableSchema->getAllNodeProperties()) { propertyColumns[property.propertyID] = ColumnFactory::getColumn( StorageUtils::getNodePropertyColumnStructureIDAndFName(wal->getDirectory(), property), - property.dataType, &bufferManager, wal); + property.dataType, bm, wal); } + return propertyColumns; +} + +void NodeTable::initializeData(NodeTableSchema* nodeTableSchema) { + propertyColumns = initializeColumns(wal, &bufferManager, nodeTableSchema); if (nodeTableSchema->getPrimaryKey().dataType.getLogicalTypeID() != LogicalTypeID::SERIAL) { pkIndex = std::make_unique( StorageUtils::getNodeIndexIDAndFName(wal->getDirectory(), tableID), @@ -26,13 +33,6 @@ void NodeTable::initializeData(NodeTableSchema* nodeTableSchema) { } } -void NodeTable::resetColumns(catalog::NodeTableSchema* nodeTableSchema) { - for (auto& property : nodeTableSchema->getAllNodeProperties()) { - propertyColumns[property.propertyID].reset(); - } - pkIndex.reset(); -} - void NodeTable::scan(transaction::Transaction* transaction, ValueVector* inputIDVector, const std::vector& columnIds, std::vector outputVectors) { assert(columnIds.size() == outputVectors.size()); diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index d90a6912a90..c5f681a6e7e 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -1,7 +1,6 @@ #include "storage/store/rel_table.h" #include "common/string_utils.h" -#include "spdlog/spdlog.h" #include "storage/storage_structure/lists/lists_update_iterator.h" using namespace kuzu::catalog; diff --git a/src/storage/wal_replayer.cpp b/src/storage/wal_replayer.cpp index 3cce10b8bef..46f29d16611 100644 --- a/src/storage/wal_replayer.cpp +++ b/src/storage/wal_replayer.cpp @@ -33,18 +33,11 @@ void WALReplayer::replay() { throw StorageException( "Cannot checkpointInMemory WAL because last logged record is not a commit record."); } - if (isRecovering && !wal->isLastLoggedRecordCommit()) { - throw StorageException("System should not try to rollback when the last logged record is " - "not a commit record."); - } - - if (!wal->isEmptyWAL()) { - auto walIterator = wal->getIterator(); - WALRecord walRecord; - while (walIterator->hasNextRecord()) { - walIterator->getNextRecord(walRecord); - replayWALRecord(walRecord); - } + auto walIterator = wal->getIterator(); + WALRecord walRecord; + while (walIterator->hasNextRecord()) { + walIterator->getNextRecord(walRecord); + replayWALRecord(walRecord); } // We next perform an in-memory checkpointing or rolling back of node/relTables. @@ -108,6 +101,10 @@ void WALReplayer::replayPageUpdateOrInsertRecord(const kuzu::storage::WALRecord& std::unique_ptr fileInfoOfStorageStructure = StorageUtils::getFileInfoForReadWrite(wal->getDirectory(), storageStructureID); if (isCheckpoint) { + if (!wal->isLastLoggedRecordCommit()) { + // Nothing to undo. + return; + } walFileHandle->readPage(pageBuffer.get(), walRecord.pageInsertOrUpdateRecord.pageIdxInWAL); FileUtils::writeToFile(fileInfoOfStorageStructure.get(), pageBuffer.get(), BufferPoolConstants::PAGE_4KB_SIZE, @@ -276,33 +273,17 @@ void WALReplayer::replayOverflowFileNextBytePosRecord(const kuzu::storage::WALRe } void WALReplayer::replayCopyNodeRecord(const kuzu::storage::WALRecord& walRecord) { + auto tableID = walRecord.copyNodeRecord.tableID; if (isCheckpoint) { - auto tableID = walRecord.copyNodeRecord.tableID; if (!isRecovering) { - auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(tableID); - // If the WAL version of the file doesn't exist, we must have already replayed - // this WAL and successfully replaced the original DB file and deleted the WAL - // version but somehow WALReplayer must have failed/crashed before deleting the - // entire WAL (which is why the log record is still here). In that case the - // renaming has already happened, so we do not have to do anything, which is the - // behavior of replaceNodeWithVersionFromWALIfExists, i.e., if the WAL version - // of the file does not exist, it will not do anything. - storageManager->getNodesStore().getNodeTable(tableID)->resetColumns(nodeTableSchema); - WALReplayerUtils::replaceNodeFilesWithVersionFromWALIfExists( - nodeTableSchema, wal->getDirectory()); - auto relTableSchemas = catalog->getAllRelTableSchemasContainBoundTable(tableID); - for (auto relTableSchema : relTableSchemas) { - storageManager->getRelsStore() - .getRelTable(relTableSchema->tableID) - ->resetColumnsAndLists(relTableSchema); - } - WALReplayerUtils::replaceListsHeadersFilesWithVersionFromWALIfExists( - relTableSchemas, tableID, wal->getDirectory()); + // CHECKPOINT. // If we are not recovering, i.e., we are checkpointing during normal execution, // then we need to update the nodeTable because the actual columns and lists // files have been changed during checkpoint. So the in memory // fileHandles are obsolete and should be reconstructed (e.g. since the numPages // have likely changed they need to reconstruct their page locks). + auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(tableID); + auto relTableSchemas = catalog->getAllRelTableSchemasContainBoundTable(tableID); storageManager->getNodesStore().getNodeTable(tableID)->initializeData(nodeTableSchema); for (auto relTableSchema : relTableSchemas) { storageManager->getRelsStore() @@ -310,44 +291,56 @@ void WALReplayer::replayCopyNodeRecord(const kuzu::storage::WALRecord& walRecord ->initializeData(relTableSchema); } } else { + // RECOVERY. + if (wal->isLastLoggedRecordCommit()) { + return; + } auto catalogForRecovery = getCatalogForRecovery(DBFileType::ORIGINAL); - // See comments above. - WALReplayerUtils::replaceNodeFilesWithVersionFromWALIfExists( + WALReplayerUtils::createEmptyDBFilesForNewNodeTable( catalogForRecovery->getReadOnlyVersion()->getNodeTableSchema(tableID), wal->getDirectory()); - WALReplayerUtils::replaceListsHeadersFilesWithVersionFromWALIfExists( - catalogForRecovery->getAllRelTableSchemasContainBoundTable(tableID), tableID, - wal->getDirectory()); } } else { - // Since COPY statements are single statements that are auto committed, it is - // impossible for users to roll back a COPY statement. + // ROLLBACK. + WALReplayerUtils::createEmptyDBFilesForNewNodeTable( + catalog->getReadOnlyVersion()->getNodeTableSchema(tableID), wal->getDirectory()); } } void WALReplayer::replayCopyRelRecord(const kuzu::storage::WALRecord& walRecord) { + auto tableID = walRecord.copyRelRecord.tableID; if (isCheckpoint) { - auto tableID = walRecord.copyRelRecord.tableID; if (!isRecovering) { + // CHECKPOINT. storageManager->getRelsStore().getRelTable(tableID)->resetColumnsAndLists( catalog->getReadOnlyVersion()->getRelTableSchema(tableID)); // See comments for COPY_NODE_RECORD. - WALReplayerUtils::replaceRelPropertyFilesWithVersionFromWALIfExists( - catalog->getReadOnlyVersion()->getRelTableSchema(tableID), wal->getDirectory()); - // See comments for COPY_NODE_RECORD. storageManager->getRelsStore().getRelTable(tableID)->initializeData( catalog->getReadOnlyVersion()->getRelTableSchema(tableID)); storageManager->getNodesStore().getNodesStatisticsAndDeletedIDs().setAdjListsAndColumns( &storageManager->getRelsStore()); } else { + // RECOVERY. + if (wal->isLastLoggedRecordCommit()) { + return; + } + auto nodesStatisticsAndDeletedIDsForCheckPointing = + std::make_unique(wal->getDirectory()); + auto maxNodeOffsetPerTable = + nodesStatisticsAndDeletedIDsForCheckPointing->getMaxNodeOffsetPerTable(); auto catalogForRecovery = getCatalogForRecovery(DBFileType::ORIGINAL); - // See comments for COPY_NODE_RECORD. - WALReplayerUtils::replaceRelPropertyFilesWithVersionFromWALIfExists( + WALReplayerUtils::createEmptyDBFilesForNewRelTable( catalogForRecovery->getReadOnlyVersion()->getRelTableSchema(tableID), - wal->getDirectory()); + wal->getDirectory(), maxNodeOffsetPerTable); } } else { - // See comments for COPY_NODE_RECORD. + // ROLLBACK. + WALReplayerUtils::createEmptyDBFilesForNewRelTable( + catalog->getReadOnlyVersion()->getRelTableSchema(walRecord.relTableRecord.tableID), + wal->getDirectory(), + storageManager->getNodesStore() + .getNodesStatisticsAndDeletedIDs() + .getMaxNodeOffsetPerTable()); } } @@ -366,6 +359,10 @@ void WALReplayer::replayDropTableRecord(const kuzu::storage::WALRecord& walRecor catalog->getReadOnlyVersion()->getRelTableSchema(tableID), wal->getDirectory()); } } else { + if (!wal->isLastLoggedRecordCommit()) { + // Nothing to undo. + return; + } auto catalogForRecovery = getCatalogForRecovery(DBFileType::ORIGINAL); if (catalogForRecovery->getReadOnlyVersion()->containNodeTable(tableID)) { WALReplayerUtils::removeDBFilesForNodeTable( @@ -398,6 +395,10 @@ void WALReplayer::replayDropPropertyRecord(const kuzu::storage::WALRecord& walRe catalog->getReadOnlyVersion()->getRelTableSchema(tableID), propertyID); } } else { + if (!wal->isLastLoggedRecordCommit()) { + // Nothing to undo. + return; + } auto catalogForRecovery = getCatalogForRecovery(DBFileType::WAL_VERSION); if (catalogForRecovery->getReadOnlyVersion()->containNodeTable(tableID)) { WALReplayerUtils::removeDBFilesForNodeProperty( @@ -431,6 +432,10 @@ void WALReplayer::replayAddPropertyRecord(const kuzu::storage::WALRecord& walRec property, *reinterpret_cast(tableSchema)); } } else { + if (!wal->isLastLoggedRecordCommit()) { + // Nothing to undo. + return; + } auto catalogForRecovery = getCatalogForRecovery(DBFileType::WAL_VERSION); auto tableSchema = catalogForRecovery->getReadOnlyVersion()->getTableSchema(tableID); if (catalogForRecovery->getReadOnlyVersion()->containNodeTable(tableID)) { diff --git a/src/storage/wal_replayer_utils.cpp b/src/storage/wal_replayer_utils.cpp index 6e4d2590446..e5332d331cb 100644 --- a/src/storage/wal_replayer_utils.cpp +++ b/src/storage/wal_replayer_utils.cpp @@ -86,23 +86,6 @@ void WALReplayerUtils::renameDBFilesForRelProperty(const std::string& directory, } } -void WALReplayerUtils::replaceListsHeadersFilesWithVersionFromWALIfExists( - const std::unordered_set& relTableSchemas, table_id_t boundTableID, - const std::string& directory) { - for (auto relTableSchema : relTableSchemas) { - for (auto direction : RelDataDirectionUtils::getRelDataDirections()) { - if (!relTableSchema->isSingleMultiplicityInDirection(direction)) { - auto listsHeadersFileName = - StorageUtils::getListHeadersFName(StorageUtils::getAdjListsFName( - directory, relTableSchema->tableID, direction, DBFileType::ORIGINAL)); - auto walListsHeadersFileName = - StorageUtils::appendWALFileSuffix(listsHeadersFileName); - FileUtils::renameFileIfExists(walListsHeadersFileName, listsHeadersFileName); - } - } - } -} - void WALReplayerUtils::createEmptyDBFilesForRelProperties(RelTableSchema* relTableSchema, const std::string& directory, RelDataDirection relDirection, uint32_t numNodes, bool isForRelPropertyColumn) { diff --git a/test/runner/e2e_copy_transaction_test.cpp b/test/runner/e2e_copy_transaction_test.cpp index ed0a610732c..a89a5215471 100644 --- a/test/runner/e2e_copy_transaction_test.cpp +++ b/test/runner/e2e_copy_transaction_test.cpp @@ -45,10 +45,6 @@ class TinySnbCopyCSVTransactionTest : public EmptyDBTest { void validateDatabaseStateBeforeCheckPointCopyNode(table_id_t tableID) { auto nodeTableSchema = (NodeTableSchema*)catalog->getReadOnlyVersion()->getTableSchema(tableID); - // Before checkPointing, we should have two versions of node column and list files. The - // updates to maxNodeOffset should be invisible to read-only transactions. - validateNodeColumnFilesExistence( - nodeTableSchema, DBFileType::WAL_VERSION, true /* existence */); validateNodeColumnFilesExistence( nodeTableSchema, DBFileType::ORIGINAL, true /* existence */); ASSERT_EQ(std::make_unique(database.get()) @@ -65,11 +61,6 @@ class TinySnbCopyCSVTransactionTest : public EmptyDBTest { void validateDatabaseStateAfterCheckPointCopyNode(table_id_t tableID) { auto nodeTableSchema = (NodeTableSchema*)catalog->getReadOnlyVersion()->getTableSchema(tableID); - // After checkPointing, we should only have one version of node column and list - // files(original version). The updates to maxNodeOffset should be visible to read-only - // transaction; - validateNodeColumnFilesExistence( - nodeTableSchema, DBFileType::WAL_VERSION, false /* existence */); validateNodeColumnFilesExistence( nodeTableSchema, DBFileType::ORIGINAL, true /* existence */); validateTinysnbPersonAgeProperty(); @@ -132,9 +123,6 @@ class TinySnbCopyCSVTransactionTest : public EmptyDBTest { void validateDatabaseStateBeforeCheckPointCopyRel(table_id_t tableID) { auto relTableSchema = (RelTableSchema*)catalog->getReadOnlyVersion()->getTableSchema(tableID); - // Before checkPointing, we should have two versions of rel column and list files. - validateRelColumnAndListFilesExistence( - relTableSchema, DBFileType::WAL_VERSION, true /* existence */); validateRelColumnAndListFilesExistence( relTableSchema, DBFileType::ORIGINAL, true /* existence */); auto dummyWriteTrx = transaction::Transaction::getDummyWriteTrx(); @@ -146,10 +134,6 @@ class TinySnbCopyCSVTransactionTest : public EmptyDBTest { void validateDatabaseStateAfterCheckPointCopyRel(table_id_t knowsTableID) { auto relTableSchema = (RelTableSchema*)catalog->getReadOnlyVersion()->getTableSchema(knowsTableID); - // After checkPointing, we should only have one version of rel column and list - // files(original version). - validateRelColumnAndListFilesExistence( - relTableSchema, DBFileType::WAL_VERSION, false /* existence */); validateRelColumnAndListFilesExistence( relTableSchema, DBFileType::ORIGINAL, true /* existence */); validateTinysnbKnowsDateProperty();