Skip to content

Commit

Permalink
Bypass WAL when writing new pages to the disk array
Browse files Browse the repository at this point in the history
Currently only enabled in the hash index since it doesn't support
rolling back. This does mean that rollbacks will sometimes waste pages
when inserting into the hash index with CREATE, which is a regression.
  • Loading branch information
benjaminwinger committed Mar 28, 2024
1 parent fed83ba commit e092969
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 38 deletions.
2 changes: 2 additions & 0 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "hash_index_header.h"
#include "hash_index_slot.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_builder.h"
#include "storage/index/hash_index_utils.h"
Expand Down Expand Up @@ -349,6 +350,7 @@ class PrimaryKeyIndex {
std::shared_ptr<BMFileHandle> fileHandle;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<OnDiskHashIndex>> hashIndices;
BufferManager& bufferManager;
};

} // namespace storage
Expand Down
62 changes: 41 additions & 21 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class BaseDiskArrayInternal {
// Used when loading from file
BaseDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction);
transaction::Transaction* transaction, bool bypassWAL = false);

virtual ~BaseDiskArrayInternal() = default;

Expand Down Expand Up @@ -204,16 +204,7 @@ class BaseDiskArrayInternal {

inline iterator& operator+=(size_t increment) { return seek(idx + increment); }

~iterator() {
// TODO(bmwinger): this should be moved into WALPageIdxAndFrame's destructor (or
// something similar)
if (walPageIdxAndFrame.originalPageIdx != common::INVALID_PAGE_IDX) {
diskArray.bufferManager->unpin(
*diskArray.wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL);
common::ku_dynamic_cast<FileHandle&, BMFileHandle&>(diskArray.fileHandle)
.releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx);
}
}
~iterator() { unpin(); }

std::span<uint8_t> operator*() const {
KU_ASSERT(idx < diskArray.headerForWriteTrx.numElements);
Expand All @@ -224,24 +215,48 @@ class BaseDiskArrayInternal {
inline uint64_t size() const { return diskArray.headerForWriteTrx.numElements; }

private:
inline void getPage(common::page_idx_t newPageIdx, bool isNewlyAdded) {
inline void unpin() {
// TODO(bmwinger): this should be moved into WALPageIdxAndFrame's destructor (or
// something similar)
auto& bmFileHandle =
common::ku_dynamic_cast<FileHandle&, BMFileHandle&>(diskArray.fileHandle);
if (walPageIdxAndFrame.pageIdxInWAL != common::INVALID_PAGE_IDX) {
// unpin current page
diskArray.bufferManager->unpin(
*diskArray.wal->fileHandle, walPageIdxAndFrame.pageIdxInWAL);
common::ku_dynamic_cast<FileHandle&, BMFileHandle&>(diskArray.fileHandle)
.releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx);
bmFileHandle.releaseWALPageIdxLock(walPageIdxAndFrame.originalPageIdx);
} else if (walPageIdxAndFrame.originalPageIdx != common::INVALID_PAGE_IDX) {
bmFileHandle.setLockedPageDirty(walPageIdxAndFrame.originalPageIdx);
diskArray.bufferManager->unpin(bmFileHandle, walPageIdxAndFrame.originalPageIdx);
}
}
inline void getPage(common::page_idx_t newPageIdx, bool isNewlyAdded) {
auto& bmFileHandle =
common::ku_dynamic_cast<FileHandle&, BMFileHandle&>(diskArray.fileHandle);
unpin();
if (newPageIdx <= diskArray.lastPageOnDisk) {
// Pin new page
walPageIdxAndFrame =
DBFileUtils::createWALVersionIfNecessaryAndPinPage(newPageIdx, isNewlyAdded,
bmFileHandle, diskArray.dbFileID, *diskArray.bufferManager, *diskArray.wal);
} else {
walPageIdxAndFrame.frame = diskArray.bufferManager->pin(bmFileHandle, newPageIdx,
isNewlyAdded ? BufferManager::PageReadPolicy::DONT_READ_PAGE :
BufferManager::PageReadPolicy::READ_PAGE);
walPageIdxAndFrame.originalPageIdx = newPageIdx;
walPageIdxAndFrame.pageIdxInWAL = common::INVALID_PAGE_IDX;
}
// Pin new page
walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(newPageIdx,
isNewlyAdded, (BMFileHandle&)diskArray.fileHandle, diskArray.dbFileID,
*diskArray.bufferManager, *diskArray.wal);
}
};

iterator iter(uint64_t valueSize);

protected:
// Updates to new pages (new to this transaction) bypass the wal file.
void updatePage(uint64_t pageIdx, bool isNewPage, std::function<void(uint8_t*)> updateOp);

void updateLastPageOnDisk();

uint64_t pushBackNoLock(std::span<uint8_t> val);

inline uint64_t getNumElementsNoLock(transaction::TransactionType trxType) {
Expand Down Expand Up @@ -310,6 +325,7 @@ class BaseDiskArrayInternal {
std::shared_mutex diskArraySharedMtx;
// For write transactions only
common::page_idx_t lastAPPageIdx;
common::page_idx_t lastPageOnDisk;
};

template<typename U>
Expand All @@ -324,9 +340,14 @@ class BaseDiskArray {
BaseDiskArray(FileHandle& fileHandle, common::page_idx_t headerPageIdx, uint64_t elementSize)
: diskArray(fileHandle, headerPageIdx, elementSize) {}
// Used when loading from file
// If bypassWAL is set, the buffer manager is used to pages new to this transaction to the
// original file, but does not handle flushing them. BufferManager::flushAllDirtyPagesInFrames
// should be called on this file handle exactly once during prepare commit.
BaseDiskArray(FileHandle& fileHandle, DBFileID dbFileID, common::page_idx_t headerPageIdx,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction)
: diskArray(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
bool bypassWAL = false)
: diskArray(
fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction, bypassWAL) {}

// Note: This function is to be used only by the WRITE trx.
// The return value is the idx of val in array.
Expand Down Expand Up @@ -372,7 +393,6 @@ class BaseDiskArray {
return *this;
}


inline uint64_t idx() const { return iter.idx; }

inline void pushBack() { return iter.pushBack(); }
Expand Down
18 changes: 14 additions & 4 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <type_traits>

#include "common/assert.h"
#include "common/cast.h"
#include "common/exception/runtime.h"
#include "common/string_format.h"
#include "common/string_utils.h"
Expand All @@ -15,6 +16,8 @@
#include "common/types/ku_string.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_builder.h"
#include "storage/index/hash_index_header.h"
#include "storage/index/hash_index_slot.h"
Expand Down Expand Up @@ -108,7 +111,7 @@ HashIndex<T>::HashIndex(const DBFileIDAndName& dbFileIDAndName,
// TODO: Handle data not existing
headerArray = std::make_unique<BaseDiskArray<HashIndexHeader>>(*fileHandle,
dbFileIDAndName.dbFileID, NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX,
&bm, wal, Transaction::getDummyReadOnlyTrx().get());
&bm, wal, Transaction::getDummyReadOnlyTrx().get(), true /*bypassWAL*/);
// Read indexHeader from the headerArray, which contains only one element.
this->indexHeaderForReadTrx = std::make_unique<HashIndexHeader>(
headerArray->get(INDEX_HEADER_IDX_IN_ARRAY, TransactionType::READ_ONLY));
Expand All @@ -117,10 +120,10 @@ HashIndex<T>::HashIndex(const DBFileIDAndName& dbFileIDAndName,
this->indexHeaderForReadTrx->keyDataTypeID == TypeUtils::getPhysicalTypeIDForType<T>());
pSlots = std::make_unique<BaseDiskArray<Slot<T>>>(*fileHandle, dbFileIDAndName.dbFileID,
NUM_HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, &bm, wal,
Transaction::getDummyReadOnlyTrx().get());
Transaction::getDummyReadOnlyTrx().get(), true /*bypassWAL*/);
oSlots = std::make_unique<BaseDiskArray<Slot<T>>>(*fileHandle, dbFileIDAndName.dbFileID,
NUM_HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, &bm, wal,
Transaction::getDummyReadOnlyTrx().get());
Transaction::getDummyReadOnlyTrx().get(), true /*bypassWAL*/);
// Initialize functions.
localStorage = std::make_unique<HashIndexLocalStorage<BufferKeyType>>();
}
Expand Down Expand Up @@ -636,7 +639,7 @@ template class HashIndex<ku_string_t>;
PrimaryKeyIndex::PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool readOnly,
common::PhysicalTypeID keyDataType, BufferManager& bufferManager, WAL* wal,
VirtualFileSystem* vfs)
: keyDataTypeID(keyDataType), hasRunPrepareCommit{false} {
: hasRunPrepareCommit{false}, keyDataTypeID(keyDataType), bufferManager{bufferManager} {
fileHandle = bufferManager.getBMFileHandle(dbFileIDAndName.fName,
readOnly ? FileHandle::O_PERSISTENT_FILE_READ_ONLY :
FileHandle::O_PERSISTENT_FILE_NO_CREATE,
Expand Down Expand Up @@ -734,6 +737,13 @@ void PrimaryKeyIndex::prepareCommit() {
if (overflowFile) {
overflowFile->prepareCommit();
}
// Make sure that changes which bypassed the WAL are written.
// There is no other mechanism for enforcing that they are flushed
// and they will be dropped when the file handle is destroyed.
// TODO: Should eventually be moved into the disk array when the disk array can
// generally handle bypassing the WAL, but should only be run once per file, not once per
// disk array
bufferManager.flushAllDirtyPagesInFrames(*fileHandle);
hasRunPrepareCommit = true;
}
}
Expand Down
62 changes: 49 additions & 13 deletions src/storage/storage_structure/disk_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
#include "common/types/types.h"
#include "common/utils.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/file_handle.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"
#include "transaction/transaction.h"

using namespace kuzu::common;
using namespace kuzu::transaction;
Expand Down Expand Up @@ -44,10 +46,10 @@ BaseDiskArrayInternal::BaseDiskArrayInternal(

BaseDiskArrayInternal::BaseDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction)
transaction::Transaction* transaction, bool bypassWAL)
: fileHandle{fileHandle}, dbFileID{dbFileID}, headerPageIdx{headerPageIdx},
hasTransactionalUpdates{false}, bufferManager{bufferManager}, wal{wal},
lastAPPageIdx{INVALID_PAGE_IDX} {
lastAPPageIdx{INVALID_PAGE_IDX}, lastPageOnDisk{INVALID_PAGE_IDX} {
auto [fileHandleToPin, pageIdxToPin] = DBFileUtils::getFileHandleAndPhysicalPageIdxToPin(
ku_dynamic_cast<FileHandle&, BMFileHandle&>(fileHandle), headerPageIdx, *wal,
transaction->getType());
Expand All @@ -60,6 +62,21 @@ BaseDiskArrayInternal::BaseDiskArrayInternal(FileHandle& fileHandle, DBFileID db
pips.emplace_back(fileHandle, pips[pips.size() - 1].pipContents.nextPipPageIdx);
}
}
// If bypassing the WAL is disabled, just leave the lastPageOnDisk as invalid, as then all pages
// will be treated as updates to existing ones
if (bypassWAL) {
updateLastPageOnDisk();
}
}

void BaseDiskArrayInternal::updateLastPageOnDisk() {
auto numElements = getNumElementsNoLock(TransactionType::READ_ONLY);
if (numElements > 0) {
auto apCursor = getAPIdxAndOffsetInAP(numElements);
lastPageOnDisk = getAPPageIdxNoLock(apCursor.pageIdx, TransactionType::READ_ONLY);
} else {
lastPageOnDisk = 0;
}
}

uint64_t BaseDiskArrayInternal::getNumElements(TransactionType trxType) {
Expand All @@ -86,7 +103,7 @@ void BaseDiskArrayInternal::get(uint64_t idx, TransactionType trxType, std::span
page_idx_t apPageIdx = getAPPageIdxNoLock(apCursor.pageIdx, trxType);
auto& bmFileHandle = (BMFileHandle&)fileHandle;
if (trxType == TransactionType::READ_ONLY || !hasTransactionalUpdates ||
!bmFileHandle.hasWALPageVersionNoWALPageIdxLock(apPageIdx)) {
apPageIdx > lastPageOnDisk || !bmFileHandle.hasWALPageVersionNoWALPageIdxLock(apPageIdx)) {
bufferManager->optimisticRead(bmFileHandle, apPageIdx, [&](const uint8_t* frame) -> void {
memcpy(val.data(), frame + apCursor.elemPosInPage, val.size());
});
Expand All @@ -99,6 +116,24 @@ void BaseDiskArrayInternal::get(uint64_t idx, TransactionType trxType, std::span
}
}

void BaseDiskArrayInternal::updatePage(
uint64_t pageIdx, bool isNewPage, std::function<void(uint8_t*)> updateOp) {
auto& bmFileHandle = (BMFileHandle&)fileHandle;
// Pages which are new to this transaction are written directly to the file
// Pages which previously existed are written to the WAL file
if (pageIdx <= lastPageOnDisk) {
DBFileUtils::updatePage(bmFileHandle, dbFileID, pageIdx,
false /* not inserting a new page */, *bufferManager, *wal, updateOp);
} else {
auto frame = bufferManager->pin(bmFileHandle, pageIdx,
isNewPage ? BufferManager::PageReadPolicy::DONT_READ_PAGE :
BufferManager::PageReadPolicy::READ_PAGE);
updateOp(frame);
bmFileHandle.setLockedPageDirty(pageIdx);
bufferManager->unpin(bmFileHandle, pageIdx);
}
}

void BaseDiskArrayInternal::update(uint64_t idx, std::span<uint8_t> val) {
std::unique_lock xLck{diskArraySharedMtx};
hasTransactionalUpdates = true;
Expand All @@ -114,11 +149,9 @@ void BaseDiskArrayInternal::update(uint64_t idx, std::span<uint8_t> val) {
// getAPPageIdxNoLock logic needs to change to give the same guarantee (e.g., an apIdx = 0, may
// no longer to be guaranteed to be in pips[0].)
page_idx_t apPageIdx = getAPPageIdxNoLock(apCursor.pageIdx, TransactionType::WRITE);
DBFileUtils::updatePage((BMFileHandle&)fileHandle, dbFileID, apPageIdx,
false /* not inserting a new page */, *bufferManager, *wal,
[&apCursor, &val](uint8_t* frame) -> void {
memcpy(frame + apCursor.elemPosInPage, val.data(), val.size());
});
updatePage(apPageIdx, false /*isNewPage=*/, [&apCursor, &val](uint8_t* frame) -> void {
memcpy(frame + apCursor.elemPosInPage, val.data(), val.size());
});
}

uint64_t BaseDiskArrayInternal::pushBack(std::span<uint8_t> val) {
Expand All @@ -144,10 +177,9 @@ uint64_t BaseDiskArrayInternal::pushBackNoLock(std::span<uint8_t> val) {
getAPPageIdxAndAddAPToPIPIfNecessaryForWriteTrxNoLock(&headerForWriteTrx, apCursor.pageIdx);
// Now do the push back.
lastAPPageIdx = apPageIdx;
DBFileUtils::updatePage((BMFileHandle&)(fileHandle), dbFileID, apPageIdx, isNewlyAdded,
*bufferManager, *wal, [&apCursor, &val](uint8_t* frame) -> void {
memcpy(frame + apCursor.elemPosInPage, val.data(), val.size());
});
updatePage(apPageIdx, isNewlyAdded, [&apCursor, &val](uint8_t* frame) -> void {
memcpy(frame + apCursor.elemPosInPage, val.data(), val.size());
});
headerForWriteTrx.numElements++;
return elementIdx;
}
Expand Down Expand Up @@ -253,12 +285,16 @@ void BaseDiskArrayInternal::checkpointOrRollbackInMemoryIfNecessaryNoLock(bool i
// Note that we already updated the header to its correct state above.
pipUpdates.clear();
hasTransactionalUpdates = false;
if (isCheckpoint && lastPageOnDisk != INVALID_PAGE_IDX) {
updateLastPageOnDisk();
}
}

void BaseDiskArrayInternal::prepareCommit() {
auto& bmFileHandle = ku_dynamic_cast<FileHandle&, BMFileHandle&>(fileHandle);
// Update header if it has changed
if (headerForWriteTrx != header) {
DBFileUtils::updatePage((BMFileHandle&)(fileHandle), dbFileID, headerPageIdx,
DBFileUtils::updatePage(bmFileHandle, dbFileID, headerPageIdx,
false /* not inserting a new page */, *bufferManager, *wal,
[&](uint8_t* frame) -> void {
memcpy(frame, &headerForWriteTrx, sizeof(headerForWriteTrx));
Expand Down

0 comments on commit e092969

Please sign in to comment.