Skip to content

Commit

Permalink
Cache DiskArray PIP updates in memory
Browse files Browse the repository at this point in the history
Updates are written in prepareCommit once, instead of going through the bufferManager each time they are updated or accessed
This was already the case for the read-only PIPs
  • Loading branch information
benjaminwinger committed Mar 28, 2024
1 parent e092969 commit e45ef54
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 72 deletions.
14 changes: 7 additions & 7 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct PIP {
common::page_idx_t nextPipPageIdx;
common::page_idx_t pageIdxs[NUM_PAGE_IDXS_PER_PIP];
};
static_assert(sizeof(PIP) == common::BufferPoolConstants::PAGE_4KB_SIZE);

struct PIPWrapper {
PIPWrapper(FileHandle& fileHandle, common::page_idx_t pipPageIdx);
Expand All @@ -65,15 +66,14 @@ struct PIPWrapper {
};

struct PIPUpdates {
// updatedPipIdxs stores the idx's of existing PIPWrappers (not the physical pageIdx of those
// PIPs), which are stored in the pipPageIdx field of PIPWrapper. These are used to replace the
// PIPWrappers quickly during in-memory checkpointing.
std::unordered_set<uint64_t> updatedPipIdxs;
std::vector<common::page_idx_t> pipPageIdxsOfInsertedPIPs;
// Since PIPs are only appended to, the only existing PIP which may be modified is the last one
// This gets tracked separately to make indexing into newPIPs simpler.
std::optional<PIPWrapper> updatedLastPIP;
std::vector<PIPWrapper> newPIPs;

inline void clear() {
updatedPipIdxs.clear();
pipPageIdxsOfInsertedPIPs.clear();
updatedLastPIP.reset();
newPIPs.clear();
}
};

Expand Down
102 changes: 37 additions & 65 deletions src/storage/storage_structure/disk_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,13 @@ void BaseDiskArrayInternal::setNextPIPPageIDxOfPIPNoLock(DiskArrayHeader* update
// This happens if the first pip is being inserted, in which case we need to change the header.
if (pipIdxOfPreviousPIP == UINT64_MAX) {
updatedDiskArrayHeader->firstPIPPageIdx = nextPIPPageIdx;
} else if (pips.empty()) {
pipUpdates.newPIPs[pipIdxOfPreviousPIP].pipContents.nextPipPageIdx = nextPIPPageIdx;
} else {
page_idx_t pipPageIdxOfPreviousPIP = getUpdatedPageIdxOfPipNoLock(pipIdxOfPreviousPIP);
/*
* Note that we can safely pass insertingNewPage argument here. There are two cases;
* 1) if pipPageIdxOfPreviousPIP is a new PIP: in that case the previous caller
* would have already created the WAL page for it, so this function is not creating
* pipPageIdxOfPreviousPIP. 2) if pipPageIdxOfPreviousPIP is an existing PIP, in which
* case again this function is not creating pipPageIdxOfPreviousPIP.
*/
DBFileUtils::updatePage((BMFileHandle&)fileHandle, dbFileID, pipPageIdxOfPreviousPIP,
false /* not inserting a new page */, *bufferManager, *wal,
[&nextPIPPageIdx](
const uint8_t* frame) -> void { ((PIP*)frame)->nextPipPageIdx = nextPIPPageIdx; });
// The above updatePage operation changes the "previousPIP" identified by
// pipIdxOfPreviousPIP, so we put it to updatedPIPIdxs if it was a pip that already existed
// before this transaction started. If pipIdxOfPreviousPIP >= pips.size() then it must
// already be in pipUpdates.pipPageIdxsOfInsertedPIPs, so we do not need to insert it to
// pipUpdates.pipPageIdxsOfInsertedPIPs (and hence there is no else to the below if).
if (pipIdxOfPreviousPIP < pips.size()) {
pipUpdates.updatedPipIdxs.insert(pipIdxOfPreviousPIP);
if (!pipUpdates.updatedLastPIP.has_value()) {
pipUpdates.updatedLastPIP = std::make_optional(pips[pipIdxOfPreviousPIP]);
}
pipUpdates.updatedLastPIP->pipContents.nextPipPageIdx = nextPIPPageIdx;
}
}

Expand All @@ -220,30 +206,15 @@ page_idx_t BaseDiskArrayInternal::getAPPageIdxNoLock(page_idx_t apIdx, Transacti
if ((trxType == TransactionType::READ_ONLY) || !hasPIPUpdatesNoLock(pipIdx)) {
return pips[pipIdx].pipContents.pageIdxs[offsetInPIP];
} else {
page_idx_t retVal;
page_idx_t pageIdxOfUpdatedPip = getUpdatedPageIdxOfPipNoLock(pipIdx);
if (ku_dynamic_cast<FileHandle&, BMFileHandle&>(fileHandle)
.hasWALPageVersionNoWALPageIdxLock(pageIdxOfUpdatedPip)) {
((BMFileHandle&)fileHandle).acquireWALPageIdxLock(pageIdxOfUpdatedPip);
DBFileUtils::readWALVersionOfPage((BMFileHandle&)fileHandle, pageIdxOfUpdatedPip,
*bufferManager, *wal, [&retVal, &offsetInPIP](const uint8_t* frame) -> void {
retVal = ((PIP*)frame)->pageIdxs[offsetInPIP];
});
} else {
bufferManager->optimisticRead((BMFileHandle&)fileHandle, pageIdxOfUpdatedPip,
[&retVal, &offsetInPIP](const uint8_t* frame) -> void {
retVal = ((PIP*)frame)->pageIdxs[offsetInPIP];
});
}
return retVal;
return pipUpdates.newPIPs[pipIdx - pips.size()].pipContents.pageIdxs[offsetInPIP];
}
}

page_idx_t BaseDiskArrayInternal::getUpdatedPageIdxOfPipNoLock(uint64_t pipIdx) {
if (pipIdx < pips.size()) {
return pips[pipIdx].pipPageIdx;
}
return pipUpdates.pipPageIdxsOfInsertedPIPs[pipIdx - pips.size()];
return pipUpdates.newPIPs[pipIdx - pips.size()].pipPageIdx;
}

void BaseDiskArrayInternal::clearWALPageVersionAndRemovePageFromFrameIfNecessary(
Expand All @@ -262,24 +233,23 @@ void BaseDiskArrayInternal::checkpointOrRollbackInMemoryIfNecessaryNoLock(bool i
headerForWriteTrx = header;
}
clearWALPageVersionAndRemovePageFromFrameIfNecessary(headerPageIdx);
for (uint64_t pipIdxOfUpdatedPIP : pipUpdates.updatedPipIdxs) {
if (pipUpdates.updatedLastPIP.has_value()) {
// Note: This should not cause a memory leak because PIPWrapper is a struct. So we
// should overwrite the previous PIPWrapper's memory.
if (isCheckpoint) {
pips[pipIdxOfUpdatedPIP] = PIPWrapper(fileHandle, pips[pipIdxOfUpdatedPIP].pipPageIdx);
pips.back() = *pipUpdates.updatedLastPIP;
}
clearWALPageVersionAndRemovePageFromFrameIfNecessary(pips[pipIdxOfUpdatedPIP].pipPageIdx);
clearWALPageVersionAndRemovePageFromFrameIfNecessary(pips.back().pipPageIdx);
}

for (page_idx_t pipPageIdxOfNewPIP : pipUpdates.pipPageIdxsOfInsertedPIPs) {
for (auto& newPIP : pipUpdates.newPIPs) {
clearWALPageVersionAndRemovePageFromFrameIfNecessary(newPIP.pipPageIdx);
if (isCheckpoint) {
pips.emplace_back(fileHandle, pipPageIdxOfNewPIP);
}
clearWALPageVersionAndRemovePageFromFrameIfNecessary(pipPageIdxOfNewPIP);
if (!isCheckpoint) {
pips.emplace_back(std::move(newPIP));
} else {
// These are newly inserted pages, so we can truncate the file handle.
((BMFileHandle&)this->fileHandle)
.removePageIdxAndTruncateIfNecessary(pipPageIdxOfNewPIP);
.removePageIdxAndTruncateIfNecessary(newPIP.pipPageIdx);
}
}
// Note that we already updated the header to its correct state above.
Expand All @@ -300,6 +270,16 @@ void BaseDiskArrayInternal::prepareCommit() {
memcpy(frame, &headerForWriteTrx, sizeof(headerForWriteTrx));
});
}
if (pipUpdates.updatedLastPIP.has_value()) {
DBFileUtils::updatePage(bmFileHandle, dbFileID, pipUpdates.updatedLastPIP->pipPageIdx, true,
*bufferManager, *wal, [&](auto* frame) {
memcpy(frame, &pipUpdates.updatedLastPIP->pipContents, sizeof(PIP));
});
}
for (auto& newPIP : pipUpdates.newPIPs) {
DBFileUtils::updatePage(bmFileHandle, dbFileID, newPIP.pipPageIdx, true, *bufferManager,
*wal, [&](auto* frame) { memcpy(frame, &newPIP.pipContents, sizeof(PIP)); });
}
}

bool BaseDiskArrayInternal::hasPIPUpdatesNoLock(uint64_t pipIdx) {
Expand All @@ -309,7 +289,7 @@ bool BaseDiskArrayInternal::hasPIPUpdatesNoLock(uint64_t pipIdx) {
if (pipIdx >= pips.size()) {
return true;
}
return pipUpdates.updatedPipIdxs.contains(pipIdx);
return (pipIdx == pips.size() - 1) && pipUpdates.updatedLastPIP;
}

std::pair<page_idx_t, bool>
Expand All @@ -335,35 +315,27 @@ BaseDiskArrayInternal::getAPPageIdxAndAddAPToPIPIfNecessaryForWriteTrxNoLock(
uint64_t pipIdx = pipIdxAndOffsetOfNewAP.first;
uint64_t offsetOfNewAPInPIP = pipIdxAndOffsetOfNewAP.second;
updatedDiskArrayHeader->numAPs++;
page_idx_t pipPageIdx = DBFileUtils::NULL_PAGE_IDX;
bool isInsertingANewPIPPage = false;
if (pipIdx < pips.size()) {
KU_ASSERT(pipIdx == pips.size() - 1);
// We do not need to insert a new pip and we need to add newAPPageIdx to a PIP that
// existed before this transaction started.
pipUpdates.updatedPipIdxs.insert(pipIdx);
pipPageIdx = pips[pipIdx].pipPageIdx;
} else if ((pipIdx - pips.size()) < pipUpdates.pipPageIdxsOfInsertedPIPs.size()) {
if (!pipUpdates.updatedLastPIP.has_value()) {
pipUpdates.updatedLastPIP = std::make_optional(pips[pipIdx]);
}
pipUpdates.updatedLastPIP->pipContents.pageIdxs[offsetOfNewAPInPIP] = newAPPageIdx;
} else if ((pipIdx - pips.size()) < pipUpdates.newPIPs.size()) {
// We do not need to insert a new PIP and we need to add newAPPageIdx to a new PIP that
// already got created after this transaction started.
pipPageIdx = pipUpdates.pipPageIdxsOfInsertedPIPs[pipIdx - pips.size()];
auto& pip = pipUpdates.newPIPs[pipIdx - pips.size()];
pip.pipContents.pageIdxs[offsetOfNewAPInPIP] = newAPPageIdx;
} else {
// We need to create a new PIP and make the previous PIP (or the header) point to it.
isInsertingANewPIPPage = true;
pipPageIdx = fileHandle.addNewPage();
pipUpdates.pipPageIdxsOfInsertedPIPs.push_back(pipPageIdx);
auto pipPageIdx = fileHandle.addNewPage();
pipUpdates.newPIPs.emplace_back(pipPageIdx);
uint64_t pipIdxOfPreviousPIP = pipIdx - 1;
setNextPIPPageIDxOfPIPNoLock(updatedDiskArrayHeader, pipIdxOfPreviousPIP, pipPageIdx);
pipUpdates.newPIPs.back().pipContents.pageIdxs[offsetOfNewAPInPIP] = newAPPageIdx;
}
// Finally we update the PIP page (possibly newly created) and add newAPPageIdx into it.
DBFileUtils::updatePage((BMFileHandle&)fileHandle, dbFileID, pipPageIdx,
isInsertingANewPIPPage, *bufferManager, *wal,
[&isInsertingANewPIPPage, &newAPPageIdx, &offsetOfNewAPInPIP](
const uint8_t* frame) -> void {
if (isInsertingANewPIPPage) {
((PIP*)frame)->nextPipPageIdx = DBFileUtils::NULL_PAGE_IDX;
}
((PIP*)frame)->pageIdxs[offsetOfNewAPInPIP] = newAPPageIdx;
});
return std::make_pair(newAPPageIdx, true /* inserting a new ap page */);
}
}
Expand Down

0 comments on commit e45ef54

Please sign in to comment.