diff --git a/src/include/storage/index/hash_index.h b/src/include/storage/index/hash_index.h index 79b7f6366b4..a06f2ce8eae 100644 --- a/src/include/storage/index/hash_index.h +++ b/src/include/storage/index/hash_index.h @@ -7,7 +7,7 @@ #include "hash_index_slot.h" #include "storage/index/hash_index_utils.h" #include "storage/storage_structure/disk_array.h" -#include "storage/storage_structure/disk_overflow_file.h" +#include "storage/storage_structure/overflow_file.h" #include "transaction/transaction.h" namespace kuzu { @@ -51,9 +51,8 @@ class HashIndex final : public OnDiskHashIndex { public: HashIndex(const DBFileIDAndName& dbFileIDAndName, - const std::shared_ptr& fileHandle, - const std::shared_ptr& overflowFile, uint64_t indexPos, - BufferManager& bufferManager, WAL* wal); + const std::shared_ptr& fileHandle, OverflowFileHandle* overflowFileHandle, + uint64_t indexPos, BufferManager& bufferManager, WAL* wal); ~HashIndex() override; @@ -181,7 +180,7 @@ class HashIndex final : public OnDiskHashIndex { std::unique_ptr> headerArray; std::unique_ptr>> pSlots; std::unique_ptr>> oSlots; - std::shared_ptr diskOverflowFile; + OverflowFileHandle* overflowFileHandle; std::unique_ptr> localStorage; std::unique_ptr indexHeaderForReadTrx; std::unique_ptr indexHeaderForWriteTrx; @@ -262,12 +261,12 @@ class PrimaryKeyIndex { void prepareCommit(); void prepareRollback(); BMFileHandle* getFileHandle() { return fileHandle.get(); } - DiskOverflowFile* getDiskOverflowFile() { return overflowFile.get(); } + OverflowFile* getOverflowFile() { return overflowFile.get(); } private: common::PhysicalTypeID keyDataTypeID; std::shared_ptr fileHandle; - std::shared_ptr overflowFile; + std::unique_ptr overflowFile; std::vector> hashIndices; }; diff --git a/src/include/storage/index/hash_index_builder.h b/src/include/storage/index/hash_index_builder.h index aaf2e81bc48..fcd6d1e6d9a 100644 --- a/src/include/storage/index/hash_index_builder.h +++ b/src/include/storage/index/hash_index_builder.h @@ -6,7 +6,7 @@ #include "storage/index/hash_index_slot.h" #include "storage/index/hash_index_utils.h" #include "storage/storage_structure/disk_array.h" -#include "storage/storage_structure/in_mem_file.h" +#include "storage/storage_structure/overflow_file.h" namespace kuzu { namespace storage { @@ -55,7 +55,7 @@ class HashIndexBuilder final : public InMemHashIndex { public: HashIndexBuilder(const std::shared_ptr& handle, - std::unique_ptr overflowFile, uint64_t indexPos, + OverflowFileHandle* overflowFileHandle, uint64_t indexPos, common::PhysicalTypeID keyDataType); public: @@ -116,7 +116,7 @@ class HashIndexBuilder final : public InMemHashIndex { private: std::shared_ptr fileHandle; - std::unique_ptr inMemOverflowFile; + OverflowFileHandle* overflowFileHandle; std::unique_ptr> headerArray; std::unique_ptr>> pSlots; std::unique_ptr>> oSlots; @@ -181,7 +181,7 @@ class PrimaryKeyIndexBuilder { private: common::PhysicalTypeID keyDataTypeID; - std::atomic overflowPageCounter; + std::unique_ptr overflowFile; std::vector> hashIndexBuilders; }; diff --git a/src/include/storage/storage_structure/disk_overflow_file.h b/src/include/storage/storage_structure/disk_overflow_file.h deleted file mode 100644 index 39e1be23de2..00000000000 --- a/src/include/storage/storage_structure/disk_overflow_file.h +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#include - -#include "common/constants.h" -#include "common/types/types.h" -#include "storage/buffer_manager/buffer_manager.h" -#include "storage/storage_utils.h" -#include "storage/wal/wal.h" -#include "transaction/transaction.h" - -namespace kuzu { -namespace storage { - -class DiskOverflowFile { - -public: - DiskOverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, WAL* wal, - bool readOnly, common::VirtualFileSystem* vfs) - : bufferManager{bufferManager}, wal{wal}, loggedNewOverflowFileNextBytePosRecord{false} { - auto overflowFileIDAndName = constructDBFileIDAndName(dbFileIdAndName); - dbFileID = overflowFileIDAndName.dbFileID; - fileHandle = bufferManager->getBMFileHandle(overflowFileIDAndName.fName, - readOnly ? FileHandle::O_PERSISTENT_FILE_READ_ONLY : - FileHandle::O_PERSISTENT_FILE_NO_CREATE, - BMFileHandle::FileVersionedType::VERSIONED_FILE, vfs); - nextPosToWriteTo.elemPosInPage = 0; - nextPosToWriteTo.pageIdx = fileHandle->getNumPages(); - } - - std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str); - - common::ku_string_t writeString(std::string_view rawString); - inline common::ku_string_t writeString(const char* rawString) { - return writeString(std::string_view(rawString)); - } - - inline BMFileHandle* getFileHandle() { return fileHandle.get(); } - inline void resetNextBytePosToWriteTo(uint64_t nextBytePosToWriteTo_) { - nextPosToWriteTo.elemPosInPage = - nextBytePosToWriteTo_ % common::BufferPoolConstants::PAGE_4KB_SIZE; - nextPosToWriteTo.pageIdx = - nextBytePosToWriteTo_ / common::BufferPoolConstants::PAGE_4KB_SIZE; - } - void resetLoggedNewOverflowFileNextBytePosRecord() { - loggedNewOverflowFileNextBytePosRecord = false; - } - -private: - static inline DBFileIDAndName constructDBFileIDAndName( - const DBFileIDAndName& dbFileIdAndNameForMainDBFile) { - DBFileIDAndName copy = dbFileIdAndNameForMainDBFile; - copy.dbFileID.isOverflow = true; - copy.fName = StorageUtils::getOverflowFileName(dbFileIdAndNameForMainDBFile.fName); - return copy; - } - -private: - bool addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend); - void setStringOverflowWithoutLock( - const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString); - void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock(); - -private: - static const common::page_idx_t END_OF_PAGE = - common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t); - DBFileID dbFileID; - std::unique_ptr fileHandle; - BufferManager* bufferManager; - WAL* wal; - // This is the index of the last free byte to which we can write. - PageCursor nextPosToWriteTo; - // Mtx is obtained if multiple threads want to write to the overflows to coordinate them. - // We cannot directly coordinate using the locks of the pages in the overflow fileHandle. - // This is because multiple threads will be trying to edit the nextBytePosToWriteTo. - // For simplicity, currently only a single thread can update overflow pages at ant time. See the - // note in writeStringOverflowAndUpdateOverflowPtr for more details. - std::mutex mtx; - bool loggedNewOverflowFileNextBytePosRecord; -}; - -} // namespace storage -} // namespace kuzu diff --git a/src/include/storage/storage_structure/in_mem_file.h b/src/include/storage/storage_structure/in_mem_file.h deleted file mode 100644 index e6945c1bfc1..00000000000 --- a/src/include/storage/storage_structure/in_mem_file.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once - -#include - -#include "common/constants.h" -#include "common/types/ku_string.h" -#include "common/types/types.h" -#include "storage/storage_structure/in_mem_page.h" -#include "storage/storage_utils.h" - -namespace kuzu { -namespace common { -class VirtualFileSystem; -} - -namespace storage { - -// InMemFile holds a collection of in-memory page in the memory. -class InMemFile { -public: - explicit InMemFile( - std::shared_ptr fileInfo, std::atomic& pageCounter); - - void addANewPage(); - - inline InMemPage* getPage(common::page_idx_t pageIdx) const { return pages.at(pageIdx).get(); } - - // This function appends a string if the length of the string is larger than SHORT_STR_LENGTH, - // otherwise, construct the ku_string for the rawString and return it. - // Note that this function is not thread safe. - void appendString(std::string_view rawString, common::ku_string_t& result); - inline void appendString(const char* rawString, common::ku_string_t& result) { - appendString(std::string_view(rawString), result); - } - - std::string readString(common::ku_string_t* strInInMemOvfFile) const; - bool equals(std::string_view keyToLookup, const common::ku_string_t& keyInEntry) const; - - void flush(); - - inline common::page_idx_t getNextPageIndex(common::page_idx_t currentPageIndex) const { - return *(common::page_idx_t*)(getPage(currentPageIndex)->data + END_OF_PAGE); - } - -private: - static const uint64_t END_OF_PAGE = - common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t); - std::shared_ptr fileInfo; - std::unordered_map> pages; - PageCursor nextPosToAppend; - std::atomic& pageCounter; -}; - -} // namespace storage -} // namespace kuzu diff --git a/src/include/storage/storage_structure/overflow_file.h b/src/include/storage/storage_structure/overflow_file.h new file mode 100644 index 00000000000..d5d16a3038f --- /dev/null +++ b/src/include/storage/storage_structure/overflow_file.h @@ -0,0 +1,159 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include "common/cast.h" +#include "common/constants.h" +#include "common/types/types.h" +#include "storage/buffer_manager/bm_file_handle.h" +#include "storage/buffer_manager/buffer_manager.h" +#include "storage/file_handle.h" +#include "storage/index/hash_index_utils.h" +#include "storage/storage_structure/in_mem_page.h" +#include "storage/storage_utils.h" +#include "storage/wal/wal.h" +#include "storage/wal/wal_record.h" +#include "transaction/transaction.h" + +namespace kuzu { +namespace storage { + +class OverflowFile; + +class OverflowFileHandle { + +public: + OverflowFileHandle(OverflowFile& overflowFile, PageCursor& nextPosToWriteTo) + : nextPosToWriteTo(nextPosToWriteTo), overflowFile{overflowFile} {} + // The OverflowFile stores the handles and returns pointers to them. + // Moving the handle would invalidate those pointers + OverflowFileHandle(OverflowFileHandle&& other) = delete; + + std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str); + + bool equals(transaction::TransactionType trxType, std::string_view keyToLookup, + const common::ku_string_t& keyInEntry) const; + + common::ku_string_t writeString(std::string_view rawString); + inline common::ku_string_t writeString(const char* rawString) { + return writeString(std::string_view(rawString)); + } + + void prepareCommit(); + inline void checkpointInMemory() { pageCache.clear(); } + inline void rollbackInMemory(PageCursor nextPosToWriteTo) { + pageCache.clear(); + this->nextPosToWriteTo = nextPosToWriteTo; + } + +private: + uint8_t* addANewPage(); + void setStringOverflow( + const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString); + + inline void read(transaction::TransactionType trxType, common::page_idx_t pageIdx, + const std::function& func) const; + +private: + static const common::page_idx_t END_OF_PAGE = + common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t); + // This is the index of the last free byte to which we can write. + PageCursor& nextPosToWriteTo; + OverflowFile& overflowFile; + // Cached pages which have been written in the current transaction + std::unordered_map> pageCache; +}; + +// Stores the current state of the overflow file +// The number of pages in use are stored here so that we can write new pages directly +// to the overflow file, and in the case of an interruption and rollback the header will +// still record the correct place in the file to allocate new pages +struct StringOverflowFileHeader { + common::page_idx_t pages; + PageCursor cursors[NUM_HASH_INDEXES]; + + StringOverflowFileHeader() : pages{1} { + std::fill(cursors, cursors + NUM_HASH_INDEXES, PageCursor()); + } +}; + +class OverflowFile { +public: + // For reading an existing overflow file + OverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, WAL* wal, + bool readOnly, common::VirtualFileSystem* vfs); + + // For creating an overflow file from scratch + OverflowFile(const std::string& fName, common::VirtualFileSystem* vfs) + : numPagesOnDisk{0}, fileHandle{std::make_unique( + fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs)}, + bufferManager{nullptr}, wal{nullptr}, pageCounter{0}, headerChanged{true} { + // Reserve a page for the header + getNewPageIdx(); + } + + // Handles contain a reference to the overflow file + OverflowFile(OverflowFile&& other) = delete; + + void rollbackInMemory(); + void prepareCommit(); + void checkpointInMemory(); + + static inline DBFileIDAndName constructDBFileIDAndName( + const DBFileIDAndName& dbFileIdAndNameForMainDBFile) { + DBFileIDAndName copy = dbFileIdAndNameForMainDBFile; + copy.dbFileID.isOverflow = true; + copy.fName = StorageUtils::getOverflowFileName(dbFileIdAndNameForMainDBFile.fName); + return copy; + } + + inline OverflowFileHandle* addHandle() { + KU_ASSERT(handles.size() < NUM_HASH_INDEXES); + handles.emplace_back( + std::make_unique(*this, header.cursors[handles.size()])); + return handles.back().get(); + } + + inline common::page_idx_t getNewPageIdx() { + // If this isn't the first call reserving the page header, then the header flag must be set + // prior to this + KU_ASSERT(pageCounter == 0 || headerChanged); + return pageCounter.fetch_add(1); + } + + inline void setHeaderChanged() { headerChanged = true; } + + inline BufferManager& getBM() { return *bufferManager; } + + inline WAL& getWAL() { return *wal; } + + inline BMFileHandle* getBMFileHandle() const { + return common::ku_dynamic_cast(fileHandle.get()); + } + + inline DBFileID& getFileID() { return dbFileID; } + + void readFromDisk(transaction::TransactionType trxType, common::page_idx_t pageIdx, + const std::function& func) const; + + // Writes new pages directly to the file and existing pages to the WAL + void writePageToDisk(common::page_idx_t pageIdx, uint8_t* data) const; + +private: + std::vector> handles; + StringOverflowFileHeader header; + common::page_idx_t numPagesOnDisk; + DBFileID dbFileID; + std::unique_ptr fileHandle; + BufferManager* bufferManager; + WAL* wal; + std::atomic pageCounter; + std::atomic headerChanged; +}; +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/wal/wal_record.h b/src/include/storage/wal/wal_record.h index ef11cd24310..018f55c9bf9 100644 --- a/src/include/storage/wal/wal_record.h +++ b/src/include/storage/wal/wal_record.h @@ -51,9 +51,6 @@ enum class WALRecordType : uint8_t { CREATE_TABLE_RECORD = 6, CREATE_REL_TABLE_GROUP_RECORD = 7, CREATE_RDF_GRAPH_RECORD = 8, - // Records the nextBytePosToWriteTo field's last value before the write trx started. This is - // used when rolling back to restore this value. - OVERFLOW_FILE_NEXT_BYTE_POS_RECORD = 17, COPY_TABLE_RECORD = 19, DROP_TABLE_RECORD = 20, DROP_PROPERTY_RECORD = 21, @@ -131,20 +128,6 @@ struct RdfGraphRecord { } }; -struct DiskOverflowFileNextBytePosRecord { - DBFileID dbFileID; - uint64_t prevNextBytePosToWriteTo; - - DiskOverflowFileNextBytePosRecord() = default; - - DiskOverflowFileNextBytePosRecord(DBFileID dbFileID, uint64_t prevNextByteToWriteTo) - : dbFileID{dbFileID}, prevNextBytePosToWriteTo{prevNextByteToWriteTo} {} - - inline bool operator==(const DiskOverflowFileNextBytePosRecord& rhs) const { - return dbFileID == rhs.dbFileID && prevNextBytePosToWriteTo == rhs.prevNextBytePosToWriteTo; - } -}; - struct CopyTableRecord { common::table_id_t tableID; @@ -213,7 +196,6 @@ struct WALRecord { CommitRecord commitRecord; CreateTableRecord createTableRecord; RdfGraphRecord rdfGraphRecord; - DiskOverflowFileNextBytePosRecord diskOverflowFileNextBytePosRecord; CopyTableRecord copyTableRecord; TableStatisticsRecord tableStatisticsRecord; DropTableRecord dropTableRecord; @@ -234,8 +216,6 @@ struct WALRecord { static WALRecord newRdfGraphRecord(common::table_id_t rdfGraphID, common::table_id_t resourceTableID, common::table_id_t literalTableID, common::table_id_t resourceTripleTableID, common::table_id_t literalTripleTableID); - static WALRecord newOverflowFileNextBytePosRecord( - DBFileID dbFileID, uint64_t prevNextByteToWriteTo_); static WALRecord newCopyTableRecord(common::table_id_t tableID); static WALRecord newDropTableRecord(common::table_id_t tableID); static WALRecord newDropPropertyRecord( diff --git a/src/include/storage/wal_replayer.h b/src/include/storage/wal_replayer.h index 6be113cb76e..2c7fb789982 100644 --- a/src/include/storage/wal_replayer.h +++ b/src/include/storage/wal_replayer.h @@ -33,7 +33,6 @@ class WALReplayer { void replayCatalogRecord(); void replayCreateTableRecord(const WALRecord& walRecord); void replayRdfGraphRecord(const WALRecord& walRecord); - void replayOverflowFileNextBytePosRecord(const WALRecord& walRecord); void replayCopyTableRecord(const WALRecord& walRecord); void replayDropTableRecord(const WALRecord& walRecord); void replayDropPropertyRecord(const WALRecord& walRecord); diff --git a/src/storage/index/hash_index.cpp b/src/storage/index/hash_index.cpp index c905eda99fe..6e3911217e8 100644 --- a/src/storage/index/hash_index.cpp +++ b/src/storage/index/hash_index.cpp @@ -83,9 +83,6 @@ class HashIndexLocalStorage { } } -public: - std::shared_mutex localStorageSharedMutex; - private: // When the storage type is string, allow the key type to be string_view with a custom hash // function @@ -97,11 +94,10 @@ class HashIndexLocalStorage { template HashIndex::HashIndex(const DBFileIDAndName& dbFileIDAndName, - const std::shared_ptr& fileHandle, - const std::shared_ptr& overflowFile, uint64_t indexPos, - BufferManager& bufferManager, WAL* wal) + const std::shared_ptr& fileHandle, OverflowFileHandle* overflowFileHandle, + uint64_t indexPos, BufferManager& bufferManager, WAL* wal) : dbFileIDAndName{dbFileIDAndName}, bm{bufferManager}, wal{wal}, fileHandle(fileHandle), - diskOverflowFile(overflowFile) { + overflowFileHandle(overflowFileHandle) { headerArray = std::make_unique>(*fileHandle, dbFileIDAndName.dbFileID, NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, &bm, wal, Transaction::getDummyReadOnlyTrx().get()); @@ -235,7 +231,7 @@ template<> inline common::hash_t HashIndex::hashStored( transaction::TransactionType /*trxType*/, const ku_string_t& key) const { common::hash_t hash; - auto str = diskOverflowFile->readString(TransactionType::WRITE, key); + auto str = overflowFileHandle->readString(TransactionType::WRITE, key); function::Hash::operation(str, hash); return hash; } @@ -255,7 +251,6 @@ entry_pos_t HashIndex::findMatchedEntryInSlot( template void HashIndex::prepareCommit() { - std::unique_lock xlock{localStorage->localStorageSharedMutex}; if (localStorage->hasUpdates()) { wal->addToUpdatedTables(dbFileIDAndName.dbFileID.nodeIndexID.tableID); localStorage->applyLocalChanges( @@ -267,7 +262,6 @@ void HashIndex::prepareCommit() { template void HashIndex::prepareRollback() { - std::unique_lock xlock{localStorage->localStorageSharedMutex}; if (localStorage->hasUpdates()) { wal->addToUpdatedTables(dbFileIDAndName.dbFileID.nodeIndexID.tableID); } @@ -283,6 +277,9 @@ void HashIndex::checkpointInMemory() { pSlots->checkpointInMemoryIfNecessary(); oSlots->checkpointInMemoryIfNecessary(); localStorage->clear(); + if constexpr (std::same_as) { + overflowFileHandle->checkpointInMemory(); + } } template @@ -301,7 +298,7 @@ template<> inline bool HashIndex::equals(transaction::TransactionType trxType, std::string_view keyToLookup, const ku_string_t& keyInEntry) const { if (HashIndexUtils::areStringPrefixAndLenEqual(keyToLookup, keyInEntry)) { - auto entryKeyString = diskOverflowFile->readString(trxType, keyInEntry); + auto entryKeyString = overflowFileHandle->readString(trxType, keyInEntry); return memcmp(keyToLookup.data(), entryKeyString.c_str(), entryKeyString.length()) == 0; } return false; @@ -310,7 +307,7 @@ inline bool HashIndex::equals(transaction::Transa template<> inline void HashIndex::insert( std::string_view key, uint8_t* entry, common::offset_t offset) { - auto kuString = diskOverflowFile->writeString(key); + auto kuString = overflowFileHandle->writeString(key); memcpy(entry, &kuString, NUM_BYTES_FOR_STRING_KEY); memcpy(entry + NUM_BYTES_FOR_STRING_KEY, &offset, sizeof(common::offset_t)); } @@ -389,7 +386,7 @@ PrimaryKeyIndex::PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool re if (keyDataTypeID == PhysicalTypeID::STRING) { overflowFile = - std::make_shared(dbFileIDAndName, &bufferManager, wal, readOnly, vfs); + std::make_unique(dbFileIDAndName, &bufferManager, wal, readOnly, vfs); } hashIndices.reserve(NUM_HASH_INDEXES); @@ -397,12 +394,12 @@ PrimaryKeyIndex::PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool re if constexpr (std::is_same_v) { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndices.push_back(std::make_unique>( - dbFileIDAndName, fileHandle, overflowFile, i, bufferManager, wal)); + dbFileIDAndName, fileHandle, overflowFile->addHandle(), i, bufferManager, wal)); } } else if constexpr (HashablePrimitive) { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndices.push_back(std::make_unique>( - dbFileIDAndName, fileHandle, overflowFile, i, bufferManager, wal)); + dbFileIDAndName, fileHandle, nullptr, i, bufferManager, wal)); } } else { KU_UNREACHABLE; @@ -456,18 +453,27 @@ void PrimaryKeyIndex::checkpointInMemory() { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndices[i]->checkpointInMemory(); } + if (overflowFile) { + overflowFile->checkpointInMemory(); + } } void PrimaryKeyIndex::rollbackInMemory() { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndices[i]->rollbackInMemory(); } + if (overflowFile) { + overflowFile->rollbackInMemory(); + } } void PrimaryKeyIndex::prepareCommit() { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndices[i]->prepareCommit(); } + if (overflowFile) { + overflowFile->prepareCommit(); + } } void PrimaryKeyIndex::prepareRollback() { diff --git a/src/storage/index/hash_index_builder.cpp b/src/storage/index/hash_index_builder.cpp index 0584d005964..3499267eeed 100644 --- a/src/storage/index/hash_index_builder.cpp +++ b/src/storage/index/hash_index_builder.cpp @@ -9,7 +9,7 @@ #include "common/types/types.h" #include "storage/index/hash_index_slot.h" #include "storage/index/hash_index_utils.h" -#include "storage/storage_structure/in_mem_file.h" +#include "storage/storage_structure/overflow_file.h" using namespace kuzu::common; @@ -18,8 +18,8 @@ namespace storage { template HashIndexBuilder::HashIndexBuilder(const std::shared_ptr& fileHandle, - std::unique_ptr overflowFile, uint64_t indexPos, PhysicalTypeID keyDataType) - : fileHandle(fileHandle), inMemOverflowFile(std::move(overflowFile)) { + OverflowFileHandle* overflowFileHandle, uint64_t indexPos, PhysicalTypeID keyDataType) + : fileHandle(fileHandle), overflowFileHandle(overflowFileHandle) { this->indexHeader = std::make_unique(keyDataType); headerArray = std::make_unique>(*fileHandle, NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, 0 /* numElements */); @@ -195,9 +195,6 @@ void HashIndexBuilder::flush() { headerArray->saveToDisk(); pSlots->saveToDisk(); oSlots->saveToDisk(); - if constexpr (std::is_same_v) { - inMemOverflowFile->flush(); - } } template @@ -214,7 +211,7 @@ template<> void HashIndexBuilder::insert(std::string_view key, Slot* slot, uint8_t entryPos, offset_t offset, uint8_t fingerprint) { auto entry = slot->entries[entryPos].data; - inMemOverflowFile->appendString(key, *(ku_string_t*)entry); + *(ku_string_t*)entry = overflowFileHandle->writeString(key); memcpy(entry + NUM_BYTES_FOR_STRING_KEY, &offset, sizeof(common::offset_t)); slot->header.setEntryValid(entryPos, fingerprint); } @@ -228,7 +225,8 @@ template<> common::hash_t HashIndexBuilder::hashStored( const ku_string_t& key) const { auto kuString = key; - return HashIndexUtils::hash(inMemOverflowFile->readString(&kuString)); + return HashIndexUtils::hash( + overflowFileHandle->readString(transaction::TransactionType::WRITE, kuString)); } template<> @@ -247,7 +245,8 @@ bool HashIndexBuilder::equals( return memcmp(keyToLookup.data(), keyInEntry.prefix, keyInEntry.len) == 0; } else { // For long strings, compare with overflow data - return inMemOverflowFile->equals(keyToLookup, keyInEntry); + return overflowFileHandle->equals( + transaction::TransactionType::WRITE, keyToLookup, keyInEntry); } } @@ -266,7 +265,7 @@ template class HashIndexBuilder; PrimaryKeyIndexBuilder::PrimaryKeyIndexBuilder( const std::string& fName, PhysicalTypeID keyDataType, VirtualFileSystem* vfs) - : keyDataTypeID{keyDataType}, overflowPageCounter(0) { + : keyDataTypeID{keyDataType} { auto fileHandle = std::make_shared(fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs); fileHandle->addNewPages(NUM_HEADER_PAGES * NUM_HASH_INDEXES); @@ -275,20 +274,17 @@ PrimaryKeyIndexBuilder::PrimaryKeyIndexBuilder( keyDataTypeID, [&](T) { if constexpr (std::is_same_v) { - auto overflowFileInfo = std::shared_ptr(vfs->openFile( - StorageUtils::getOverflowFileName(fileHandle->getFileInfo()->path), - O_CREAT | O_WRONLY)); + overflowFile = std::make_unique( + StorageUtils::getOverflowFileName(fileHandle->getFileInfo()->path), vfs); for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { - auto overflowFile = - std::make_unique(overflowFileInfo, overflowPageCounter); hashIndexBuilders.push_back( std::make_unique>( - fileHandle, std::move(overflowFile), i, keyDataType)); + fileHandle, overflowFile->addHandle(), i, keyDataType)); } } else if constexpr (HashablePrimitive) { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { - hashIndexBuilders.push_back(std::make_unique>( - fileHandle, std::unique_ptr(), i, keyDataType)); + hashIndexBuilders.push_back( + std::make_unique>(fileHandle, nullptr, i, keyDataType)); } } else { KU_UNREACHABLE; @@ -305,6 +301,9 @@ void PrimaryKeyIndexBuilder::bulkReserve(uint32_t numEntries) { } void PrimaryKeyIndexBuilder::flush() { + if (overflowFile) { + overflowFile->prepareCommit(); + } for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { hashIndexBuilders[i]->flush(); } diff --git a/src/storage/storage_structure/CMakeLists.txt b/src/storage/storage_structure/CMakeLists.txt index 5b86666bedc..479d01c47f0 100644 --- a/src/storage/storage_structure/CMakeLists.txt +++ b/src/storage/storage_structure/CMakeLists.txt @@ -1,8 +1,7 @@ add_library(kuzu_storage_structure OBJECT disk_array.cpp - disk_overflow_file.cpp - in_mem_file.cpp + overflow_file.cpp in_mem_page.cpp db_file_utils.cpp) diff --git a/src/storage/storage_structure/disk_overflow_file.cpp b/src/storage/storage_structure/disk_overflow_file.cpp deleted file mode 100644 index 37a0091a1cb..00000000000 --- a/src/storage/storage_structure/disk_overflow_file.cpp +++ /dev/null @@ -1,126 +0,0 @@ -#include "storage/storage_structure/disk_overflow_file.h" - -#include - -#include "common/constants.h" -#include "common/exception/message.h" -#include "common/type_utils.h" -#include "common/types/types.h" -#include "storage/storage_structure/db_file_utils.h" -#include "storage/storage_utils.h" - -using lock_t = std::unique_lock; - -using namespace kuzu::transaction; -using namespace kuzu::common; - -namespace kuzu { -namespace storage { - -std::string DiskOverflowFile::readString(TransactionType trxType, const ku_string_t& str) { - if (ku_string_t::isShortString(str.len)) { - return str.getAsShortString(); - } else { - PageCursor cursor; - TypeUtils::decodeOverflowPtr(str.overflowPtr, cursor.pageIdx, cursor.elemPosInPage); - std::string retVal; - retVal.reserve(str.len); - int32_t remainingLength = str.len; - while (remainingLength > 0) { - auto [fileHandleToPin, pageIdxToPin] = - DBFileUtils::getFileHandleAndPhysicalPageIdxToPin( - *fileHandle, cursor.pageIdx, *wal, trxType); - auto numBytesToReadInPage = std::min( - static_cast(remainingLength), END_OF_PAGE - cursor.elemPosInPage); - page_idx_t nextPage; - auto startPosInSrc = retVal.size(); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) { - // Replace rather than append, since optimistic read may call the function multiple - // times - retVal.replace(startPosInSrc, numBytesToReadInPage, - std::string_view(reinterpret_cast(frame) + cursor.elemPosInPage, - numBytesToReadInPage)); - nextPage = *(page_idx_t*)(frame + END_OF_PAGE); - }); - remainingLength -= numBytesToReadInPage; - // After the first page we always start reading from the beginning of the page. - cursor.elemPosInPage = 0; - KU_ASSERT(nextPage < fileHandle->getNumPages()); - cursor.pageIdx = nextPage; - } - return retVal; - } -} - -bool DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend) { - if ((nextPosToWriteTo.elemPosInPage == 0) || - ((nextPosToWriteTo.elemPosInPage + numBytesToAppend - 1) > END_OF_PAGE)) { - page_idx_t newPageIdx = - DBFileUtils::insertNewPage(*fileHandle, dbFileID, *bufferManager, *wal); - // Write new page index to end of previous page - if (nextPosToWriteTo.pageIdx > 0) { - DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx - 1, - false /* existing page */, *bufferManager, *wal, - [&](auto frame) { memcpy(frame + END_OF_PAGE, &newPageIdx, sizeof(page_idx_t)); }); - } - return true; - } - return false; -} - -void DiskOverflowFile::setStringOverflowWithoutLock( - const char* srcRawString, uint64_t len, ku_string_t& diskDstString) { - if (len <= ku_string_t::SHORT_STR_LENGTH) { - return; - } else if (len > BufferPoolConstants::PAGE_256KB_SIZE) { - if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) { - len = BufferPoolConstants::PAGE_256KB_SIZE; - diskDstString.len = len; - } else { - throw RuntimeException(ExceptionMessage::overLargeStringPKValueException(len)); - } - } - int32_t remainingLength = len; - TypeUtils::encodeOverflowPtr( - diskDstString.overflowPtr, nextPosToWriteTo.pageIdx, nextPosToWriteTo.elemPosInPage); - while (remainingLength > 0) { - auto bytesWritten = len - remainingLength; - auto numBytesToWriteInPage = std::min( - static_cast(remainingLength), END_OF_PAGE - nextPosToWriteTo.elemPosInPage); - bool insertingNewPage = addNewPageIfNecessaryWithoutLock(remainingLength); - DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx, insertingNewPage, - *bufferManager, *wal, [&](auto frame) { - memcpy(frame + nextPosToWriteTo.elemPosInPage, srcRawString + bytesWritten, - numBytesToWriteInPage); - }); - remainingLength -= numBytesToWriteInPage; - nextPosToWriteTo.elemPosInPage += numBytesToWriteInPage; - if (nextPosToWriteTo.elemPosInPage >= END_OF_PAGE) { - nextPosToWriteTo.nextPage(); - } - } -} - -ku_string_t DiskOverflowFile::writeString(std::string_view rawString) { - lock_t lck{mtx}; - ku_string_t result; - result.len = rawString.length(); - auto shortStrLen = ku_string_t::SHORT_STR_LENGTH; - auto inlineLen = std::min(shortStrLen, (uint64_t)result.len); - memcpy(result.prefix, rawString.data(), inlineLen); - logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock(); - setStringOverflowWithoutLock(rawString.data(), result.len, result); - return result; -} - -void DiskOverflowFile::logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock() { - if (!loggedNewOverflowFileNextBytePosRecord) { - loggedNewOverflowFileNextBytePosRecord = true; - wal->logOverflowFileNextBytePosRecord( - dbFileID, nextPosToWriteTo.pageIdx * BufferPoolConstants::PAGE_4KB_SIZE + - nextPosToWriteTo.elemPosInPage); - } -} - -} // namespace storage -} // namespace kuzu diff --git a/src/storage/storage_structure/in_mem_file.cpp b/src/storage/storage_structure/in_mem_file.cpp deleted file mode 100644 index 9d4c1d21075..00000000000 --- a/src/storage/storage_structure/in_mem_file.cpp +++ /dev/null @@ -1,128 +0,0 @@ -#include "storage/storage_structure/in_mem_file.h" - -#include "common/constants.h" -#include "common/exception/copy.h" -#include "common/exception/message.h" -#include "common/type_utils.h" -#include "common/types/ku_string.h" -#include "common/types/types.h" - -using namespace kuzu::common; - -namespace kuzu { -namespace storage { - -InMemFile::InMemFile(std::shared_ptr fileInfo, std::atomic& pageCounter) - : fileInfo{fileInfo}, nextPosToAppend(0, 0), pageCounter(pageCounter) {} - -void InMemFile::addANewPage() { - page_idx_t newPageIdx = pageCounter.fetch_add(1); - // Write the index of the next page to the end of the current page, in case a string overflows - // from one page to another This is always done, as strings are unlikely to end exactly at the - // end of a page and it's probably not worth the additional complexity to save a few bytes on - // the occasions that a string does - if (pages.size() > 0) { - pages[nextPosToAppend.pageIdx]->write( - END_OF_PAGE, reinterpret_cast(&newPageIdx), sizeof(page_idx_t)); - } - pages.emplace(newPageIdx, std::make_unique()); - nextPosToAppend.elemPosInPage = 0; - nextPosToAppend.pageIdx = newPageIdx; -} - -void InMemFile::appendString(std::string_view rawString, ku_string_t& result) { - auto length = rawString.length(); - result.len = length; - std::memcpy(result.prefix, rawString.data(), - length <= ku_string_t::SHORT_STR_LENGTH ? length : ku_string_t::PREFIX_LENGTH); - if (length > ku_string_t::SHORT_STR_LENGTH) { - if (length > BufferPoolConstants::PAGE_256KB_SIZE) { - if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) { - length = BufferPoolConstants::PAGE_256KB_SIZE; - result.len = length; - } else { - throw CopyException(ExceptionMessage::overLargeStringPKValueException(length)); - } - } - if (pages.size() == 0) { - addANewPage(); - } - - int32_t remainingLength = length; - // There should always be some space to write. The constructor adds an empty page, and - // we always add new pages if we run out of space at the end of the following loop - KU_ASSERT(nextPosToAppend.elemPosInPage < END_OF_PAGE); - - TypeUtils::encodeOverflowPtr( - result.overflowPtr, nextPosToAppend.pageIdx, nextPosToAppend.elemPosInPage); - while (remainingLength > 0) { - auto numBytesToWriteInPage = std::min(static_cast(remainingLength), - END_OF_PAGE - nextPosToAppend.elemPosInPage); - pages[nextPosToAppend.pageIdx]->write(nextPosToAppend.elemPosInPage, - reinterpret_cast(rawString.data()) + (length - remainingLength), - numBytesToWriteInPage); - remainingLength -= numBytesToWriteInPage; - // Allocate a new page if necessary. - nextPosToAppend.elemPosInPage += numBytesToWriteInPage; - if (nextPosToAppend.elemPosInPage >= END_OF_PAGE) { - addANewPage(); - } - } - } -} - -std::string InMemFile::readString(ku_string_t* strInInMemOvfFile) const { - auto length = strInInMemOvfFile->len; - if (ku_string_t::isShortString(length)) { - return strInInMemOvfFile->getAsShortString(); - } else { - PageCursor cursor; - TypeUtils::decodeOverflowPtr( - strInInMemOvfFile->overflowPtr, cursor.pageIdx, cursor.elemPosInPage); - std::string result; - result.reserve(length); - auto remainingLength = length; - while (remainingLength > 0) { - auto numBytesToReadInPage = std::min( - static_cast(remainingLength), END_OF_PAGE - cursor.elemPosInPage); - result += - std::string_view(reinterpret_cast(pages.at(cursor.pageIdx)->data) + - cursor.elemPosInPage, - numBytesToReadInPage); - cursor.elemPosInPage = 0; - cursor.pageIdx = getNextPageIndex(cursor.pageIdx); - remainingLength -= numBytesToReadInPage; - } - return result; - } -} - -bool InMemFile::equals(std::string_view keyToLookup, const ku_string_t& keyInEntry) const { - PageCursor cursor; - TypeUtils::decodeOverflowPtr(keyInEntry.overflowPtr, cursor.pageIdx, cursor.elemPosInPage); - auto lengthRead = 0u; - while (lengthRead < keyInEntry.len) { - auto numBytesToCheckInPage = std::min( - static_cast(keyInEntry.len) - lengthRead, END_OF_PAGE - cursor.elemPosInPage); - if (memcmp(keyToLookup.data() + lengthRead, - getPage(cursor.pageIdx)->data + cursor.elemPosInPage, numBytesToCheckInPage) != 0) { - return false; - } - cursor.elemPosInPage = 0; - cursor.pageIdx = getNextPageIndex(cursor.pageIdx); - lengthRead += numBytesToCheckInPage; - } - return true; -} - -void InMemFile::flush() { - for (auto& [pageIndex, page] : pages) { - // actual page index is stored inside of the InMemPage and does not correspond to the index - // in the vector - fileInfo->writeFile(page->data, BufferPoolConstants::PAGE_4KB_SIZE, - pageIndex * BufferPoolConstants::PAGE_4KB_SIZE); - } -} - -} // namespace storage -} // namespace kuzu diff --git a/src/storage/storage_structure/overflow_file.cpp b/src/storage/storage_structure/overflow_file.cpp new file mode 100644 index 00000000000..1b75d83c08e --- /dev/null +++ b/src/storage/storage_structure/overflow_file.cpp @@ -0,0 +1,232 @@ +#include "storage/storage_structure/overflow_file.h" + +#include + +#include "common/constants.h" +#include "common/exception/message.h" +#include "common/type_utils.h" +#include "common/types/types.h" +#include "storage/buffer_manager/bm_file_handle.h" +#include "storage/file_handle.h" +#include "storage/storage_structure/db_file_utils.h" +#include "storage/storage_structure/in_mem_page.h" +#include "storage/storage_utils.h" +#include "transaction/transaction.h" + +using namespace kuzu::transaction; +using namespace kuzu::common; + +namespace kuzu { +namespace storage { + +std::string OverflowFileHandle::readString(TransactionType trxType, const ku_string_t& str) { + if (ku_string_t::isShortString(str.len)) { + return str.getAsShortString(); + } else { + PageCursor cursor; + TypeUtils::decodeOverflowPtr(str.overflowPtr, cursor.pageIdx, cursor.elemPosInPage); + std::string retVal; + retVal.reserve(str.len); + int32_t remainingLength = str.len; + while (remainingLength > 0) { + auto numBytesToReadInPage = std::min( + static_cast(remainingLength), END_OF_PAGE - cursor.elemPosInPage); + auto startPosInSrc = retVal.size(); + read(trxType, cursor.pageIdx, [&](uint8_t* frame) { + // Replace rather than append, since optimistic read may call the function multiple + // times + retVal.replace(startPosInSrc, numBytesToReadInPage, + std::string_view(reinterpret_cast(frame) + cursor.elemPosInPage, + numBytesToReadInPage)); + cursor.pageIdx = *(page_idx_t*)(frame + END_OF_PAGE); + }); + remainingLength -= numBytesToReadInPage; + // After the first page we always start reading from the beginning of the page. + cursor.elemPosInPage = 0; + } + return retVal; + } +} + +bool OverflowFileHandle::equals(TransactionType trxType, std::string_view keyToLookup, + const common::ku_string_t& keyInEntry) const { + PageCursor cursor; + TypeUtils::decodeOverflowPtr(keyInEntry.overflowPtr, cursor.pageIdx, cursor.elemPosInPage); + auto lengthRead = 0u; + while (lengthRead < keyInEntry.len) { + auto numBytesToCheckInPage = std::min(static_cast(keyInEntry.len) - lengthRead, + END_OF_PAGE - cursor.elemPosInPage); + bool equal = true; + read(trxType, cursor.pageIdx, [&](auto* frame) { + equal = memcmp(keyToLookup.data() + lengthRead, frame + cursor.elemPosInPage, + numBytesToCheckInPage) == 0; + // Update the next page index + cursor.pageIdx = *(page_idx_t*)(frame + END_OF_PAGE); + }); + if (!equal) { + return false; + } + cursor.elemPosInPage = 0; + lengthRead += numBytesToCheckInPage; + } + return true; +} + +uint8_t* OverflowFileHandle::addANewPage() { + page_idx_t newPageIdx = overflowFile.getNewPageIdx(); + if (pageCache.size() > 0) { + pageCache[nextPosToWriteTo.pageIdx]->write( + END_OF_PAGE, reinterpret_cast(&newPageIdx), sizeof(page_idx_t)); + } + pageCache.emplace(newPageIdx, std::make_unique()); + nextPosToWriteTo.elemPosInPage = 0; + nextPosToWriteTo.pageIdx = newPageIdx; + return pageCache[newPageIdx]->data; +} + +void OverflowFileHandle::setStringOverflow( + const char* srcRawString, uint64_t len, ku_string_t& diskDstString) { + if (len <= ku_string_t::SHORT_STR_LENGTH) { + return; + } else if (len > BufferPoolConstants::PAGE_256KB_SIZE) { + if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) { + len = BufferPoolConstants::PAGE_256KB_SIZE; + diskDstString.len = len; + } else { + throw RuntimeException(ExceptionMessage::overLargeStringPKValueException(len)); + } + } + uint8_t* pageToWrite = nullptr; + if (nextPosToWriteTo.pageIdx == INVALID_PAGE_IDX) { + pageToWrite = addANewPage(); + } else { + auto cached = pageCache.find(nextPosToWriteTo.pageIdx); + if (cached != pageCache.end()) { + pageToWrite = cached->second->data; + } else { + overflowFile.readFromDisk( + TransactionType::WRITE, nextPosToWriteTo.pageIdx, [&](auto* frame) { + auto page = std::make_unique(); + memcpy(page->data, frame, BufferPoolConstants::PAGE_4KB_SIZE); + pageToWrite = page->data; + pageCache.emplace(nextPosToWriteTo.pageIdx, std::move(page)); + }); + } + } + int32_t remainingLength = len; + TypeUtils::encodeOverflowPtr( + diskDstString.overflowPtr, nextPosToWriteTo.pageIdx, nextPosToWriteTo.elemPosInPage); + while (remainingLength > 0) { + auto bytesWritten = len - remainingLength; + auto numBytesToWriteInPage = std::min( + static_cast(remainingLength), END_OF_PAGE - nextPosToWriteTo.elemPosInPage); + memcpy(pageToWrite + nextPosToWriteTo.elemPosInPage, srcRawString + bytesWritten, + numBytesToWriteInPage); + remainingLength -= numBytesToWriteInPage; + nextPosToWriteTo.elemPosInPage += numBytesToWriteInPage; + if (nextPosToWriteTo.elemPosInPage >= END_OF_PAGE) { + pageToWrite = addANewPage(); + } + } +} + +ku_string_t OverflowFileHandle::writeString(std::string_view rawString) { + ku_string_t result; + result.len = rawString.length(); + auto shortStrLen = ku_string_t::SHORT_STR_LENGTH; + auto inlineLen = std::min(shortStrLen, (uint64_t)result.len); + memcpy(result.prefix, rawString.data(), inlineLen); + overflowFile.setHeaderChanged(); + setStringOverflow(rawString.data(), result.len, result); + return result; +} + +void OverflowFileHandle::prepareCommit() { + for (auto& [pageIndex, page] : pageCache) { + overflowFile.writePageToDisk(pageIndex, page->data); + } +} + +void OverflowFileHandle::read(transaction::TransactionType trxType, common::page_idx_t pageIdx, + const std::function& func) const { + if (trxType == TransactionType::WRITE) { + auto cachedPage = pageCache.find(pageIdx); + if (cachedPage != pageCache.end()) { + return func(cachedPage->second->data); + } + } + overflowFile.readFromDisk(trxType, pageIdx, func); +} + +OverflowFile::OverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, + WAL* wal, bool readOnly, common::VirtualFileSystem* vfs) + : bufferManager{bufferManager}, wal{wal}, headerChanged{false} { + auto overflowFileIDAndName = constructDBFileIDAndName(dbFileIdAndName); + dbFileID = overflowFileIDAndName.dbFileID; + fileHandle = bufferManager->getBMFileHandle(overflowFileIDAndName.fName, + readOnly ? FileHandle::O_PERSISTENT_FILE_READ_ONLY : + FileHandle::O_PERSISTENT_FILE_NO_CREATE, + BMFileHandle::FileVersionedType::VERSIONED_FILE, vfs); + if (vfs->fileOrPathExists(overflowFileIDAndName.fName)) { + readFromDisk(transaction::TransactionType::READ_ONLY, 0, + [&](auto* frame) { memcpy(&header, frame, sizeof(header)); }); + pageCounter = numPagesOnDisk = header.pages; + } else { + // Reserve a page for the header + getNewPageIdx(); + } +} + +void OverflowFile::readFromDisk(transaction::TransactionType trxType, common::page_idx_t pageIdx, + const std::function& func) const { + auto [fileHandleToPin, pageIdxToPin] = + storage::DBFileUtils::getFileHandleAndPhysicalPageIdxToPin( + *getBMFileHandle(), pageIdx, *wal, trxType); + bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, func); +} + +void OverflowFile::writePageToDisk(common::page_idx_t pageIdx, uint8_t* data) const { + if (pageIdx < numPagesOnDisk) { + // TODO: updatePage does an unnecessary read + copy. We just want to overwrite + DBFileUtils::updatePage(*getBMFileHandle(), dbFileID, pageIdx, false, *bufferManager, *wal, + [&](auto* frame) { memcpy(frame, data, BufferPoolConstants::PAGE_4KB_SIZE); }); + } else { + fileHandle->writePage(data, pageIdx); + } +} + +void OverflowFile::prepareCommit() { + if (fileHandle->getNumPages() < pageCounter) { + fileHandle->addNewPages(pageCounter - fileHandle->getNumPages()); + } + // TODO(bmwinger): Ideally this could be done separately and in parallel by each HashIndex + // However fileHandle->addNewPages needs to be called beforehand, + // but after each HashIndex::prepareCommit has written to the in-memory pages + for (auto& handle : handles) { + handle->prepareCommit(); + } + if (headerChanged) { + uint8_t page[BufferPoolConstants::PAGE_4KB_SIZE]; + header.pages = pageCounter; + memcpy(page, &header, sizeof(header)); + writePageToDisk(0, page); + } +} + +void OverflowFile::checkpointInMemory() { + headerChanged = false; + numPagesOnDisk = pageCounter; +} + +void OverflowFile::rollbackInMemory() { + readFromDisk(transaction::TransactionType::READ_ONLY, 0, + [&](auto* frame) { memcpy(&header, frame, sizeof(header)); }); + numPagesOnDisk = pageCounter = header.pages; + for (auto i = 0u; i < handles.size(); i++) { + auto& handle = handles[i]; + handle->rollbackInMemory(header.cursors[i]); + } +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/wal/wal.cpp b/src/storage/wal/wal.cpp index 29255101d61..dfc729dd543 100644 --- a/src/storage/wal/wal.cpp +++ b/src/storage/wal/wal.cpp @@ -85,13 +85,6 @@ void WAL::logRdfGraphRecord(table_id_t rdfGraphID, table_id_t resourceTableID, addNewWALRecordNoLock(walRecord); } -void WAL::logOverflowFileNextBytePosRecord(DBFileID dbFileID, uint64_t prevNextByteToWriteTo) { - lock_t lck{mtx}; - WALRecord walRecord = - WALRecord::newOverflowFileNextBytePosRecord(dbFileID, prevNextByteToWriteTo); - addNewWALRecordNoLock(walRecord); -} - void WAL::logCopyTableRecord(table_id_t tableID) { lock_t lck{mtx}; WALRecord walRecord = WALRecord::newCopyTableRecord(tableID); diff --git a/src/storage/wal/wal_record.cpp b/src/storage/wal/wal_record.cpp index 54879e79117..e818d9c561e 100644 --- a/src/storage/wal/wal_record.cpp +++ b/src/storage/wal/wal_record.cpp @@ -60,9 +60,6 @@ bool WALRecord::operator==(const WALRecord& rhs) const { case WALRecordType::CREATE_RDF_GRAPH_RECORD: { return rdfGraphRecord == rhs.rdfGraphRecord; } - case WALRecordType::OVERFLOW_FILE_NEXT_BYTE_POS_RECORD: { - return diskOverflowFileNextBytePosRecord == rhs.diskOverflowFileNextBytePosRecord; - } case WALRecordType::COPY_TABLE_RECORD: { return copyTableRecord == rhs.copyTableRecord; } @@ -104,9 +101,6 @@ std::string walRecordTypeToString(WALRecordType walRecordType) { case WALRecordType::CREATE_RDF_GRAPH_RECORD: { return "CREATE_RDF_GRAPH_RECORD"; } - case WALRecordType::OVERFLOW_FILE_NEXT_BYTE_POS_RECORD: { - return "OVERFLOW_FILE_NEXT_BYTE_POS_RECORD"; - } case WALRecordType::COPY_TABLE_RECORD: { return "COPY_TABLE_RECORD"; } @@ -185,15 +179,6 @@ WALRecord WALRecord::newRdfGraphRecord(table_id_t rdfGraphID, table_id_t resourc return retVal; } -WALRecord WALRecord::newOverflowFileNextBytePosRecord( - DBFileID dbFileID_, uint64_t prevNextByteToWriteTo_) { - WALRecord retVal; - retVal.recordType = WALRecordType::OVERFLOW_FILE_NEXT_BYTE_POS_RECORD; - retVal.diskOverflowFileNextBytePosRecord = - DiskOverflowFileNextBytePosRecord(dbFileID_, prevNextByteToWriteTo_); - return retVal; -} - WALRecord WALRecord::newCopyTableRecord(table_id_t tableID) { WALRecord retVal; retVal.recordType = WALRecordType::COPY_TABLE_RECORD; diff --git a/src/storage/wal_replayer.cpp b/src/storage/wal_replayer.cpp index 011eaa868ba..a170cf21da5 100644 --- a/src/storage/wal_replayer.cpp +++ b/src/storage/wal_replayer.cpp @@ -75,9 +75,6 @@ void WALReplayer::replayWALRecord(WALRecord& walRecord) { case WALRecordType::CREATE_RDF_GRAPH_RECORD: { replayRdfGraphRecord(walRecord); } break; - case WALRecordType::OVERFLOW_FILE_NEXT_BYTE_POS_RECORD: { - replayOverflowFileNextBytePosRecord(walRecord); - } break; case WALRecordType::COPY_TABLE_RECORD: { replayCopyTableRecord(walRecord); } break; @@ -190,35 +187,6 @@ void WALReplayer::replayRdfGraphRecord(const WALRecord& walRecord) { replayCreateTableRecord(literalTripleTableWALRecord); } -void WALReplayer::replayOverflowFileNextBytePosRecord(const WALRecord& walRecord) { - // If we are recovering we do not replay OVERFLOW_FILE_NEXT_BYTE_POS_RECORD because - // this record is intended for rolling back a transaction to ensure that we can - // recover the overflow space allocated for the write transaction by calling - // DiskOverflowFile::resetNextBytePosToWriteTo(...). However during recovery, storageManager - // is null, so we cannot construct this value. - if (isRecovering) { - return; - } - KU_ASSERT(walRecord.diskOverflowFileNextBytePosRecord.dbFileID.isOverflow); - auto dbFileID = walRecord.diskOverflowFileNextBytePosRecord.dbFileID; - DiskOverflowFile* diskOverflowFile; - switch (dbFileID.dbFileType) { - case DBFileType::NODE_INDEX: { - auto index = storageManager->getPKIndex(dbFileID.nodeIndexID.tableID); - diskOverflowFile = index->getDiskOverflowFile(); - } break; - default: - throw RuntimeException("Unsupported dbFileID " + dbFileTypeToString(dbFileID.dbFileType) + - " for OVERFLOW_FILE_NEXT_BYTE_POS_RECORD."); - } - // Reset NextBytePosToWriteTo if we are rolling back. - if (!isCheckpoint) { - diskOverflowFile->resetNextBytePosToWriteTo( - walRecord.diskOverflowFileNextBytePosRecord.prevNextBytePosToWriteTo); - } - diskOverflowFile->resetLoggedNewOverflowFileNextBytePosRecord(); -} - void WALReplayer::replayCopyTableRecord(const kuzu::storage::WALRecord& walRecord) { auto tableID = walRecord.copyTableRecord.tableID; if (isCheckpoint) { @@ -398,7 +366,7 @@ BMFileHandle* WALReplayer::getVersionedFileHandleIfWALVersionAndBMShouldBeCleare } case DBFileType::NODE_INDEX: { auto index = storageManager->getPKIndex(dbFileID.nodeIndexID.tableID); - return dbFileID.isOverflow ? index->getDiskOverflowFile()->getFileHandle() : + return dbFileID.isOverflow ? index->getOverflowFile()->getBMFileHandle() : index->getFileHandle(); } default: {