Skip to content

Commit

Permalink
Replace InMemDiskArray with BaseDiskArray
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Mar 21, 2024
1 parent 2587d81 commit c390949
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 131 deletions.
72 changes: 7 additions & 65 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,78 +286,20 @@ class BaseInMemDiskArray : public BaseDiskArrayInternal {
std::vector<std::unique_ptr<uint8_t[]>> inMemArrayPages;
};

/**
* Stores an array of type U's page by page in memory, using OS memory and not the buffer manager.
* Designed currently to be used by lists headers and metadata, where we want to avoid using
* pins/unpins when accessing data through the buffer manager.
*/
class InMemDiskArrayInternal : public BaseInMemDiskArray {
public:
InMemDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction);

static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal, size_t size) {
DiskArrayHeader daHeader(size);
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
}
inline void rollbackInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

private:
void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint) override;
};

template<typename U>
class InMemDiskArray {
class InMemDiskArray : public BaseDiskArray<U> {
public:
// Used when loading from file
InMemDiskArray(FileHandle& fileHandle, DBFileID dbFileID, common::page_idx_t headerPageIdx,
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction)
: diskArray(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}

inline U& operator[](uint64_t idx) { return *(U*)diskArray[idx]; }

: BaseDiskArray<U>(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}
static inline common::page_idx_t addDAHPageToFile(
BMFileHandle& fileHandle, BufferManager* bufferManager, WAL* wal) {
return InMemDiskArrayInternal::addDAHPageToFile(fileHandle, bufferManager, wal, sizeof(U));
}

// Note: This function is to be used only by the WRITE trx.
inline void update(uint64_t idx, U val) { diskArray.update(idx, getSpan(val)); }

inline U get(uint64_t idx, transaction::TransactionType trxType) {
U val;
diskArray.get(idx, trxType, getSpan(val));
return val;
}

// Note: Currently, this function doesn't support shrinking the size of the array.
inline uint64_t resize(uint64_t newNumElements) {
U defaultVal;
return diskArray.resize(newNumElements, getSpan(defaultVal));
}

inline uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY) {
return diskArray.getNumElements(trxType);
DiskArrayHeader daHeader(sizeof(U));
return DBFileUtils::insertNewPage(fileHandle, DBFileID{DBFileType::METADATA},
*bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }
inline void prepareCommit() { diskArray.prepareCommit(); }

private:
InMemDiskArrayInternal diskArray;
};

class InMemDiskArrayBuilderInternal : public BaseInMemDiskArray {
Expand Down
66 changes: 0 additions & 66 deletions src/storage/storage_structure/disk_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,72 +357,6 @@ void BaseInMemDiskArray::readArrayPageFromFile(uint64_t apIdx, page_idx_t apPage
reinterpret_cast<uint8_t*>(this->inMemArrayPages[apIdx].get()), apPageIdx);
}

InMemDiskArrayInternal::InMemDiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction)
: BaseInMemDiskArray(fileHandle, dbFileID, headerPageIdx, bufferManager, wal, transaction) {}

void InMemDiskArrayInternal::checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint) {
if (!this->hasTransactionalUpdates) {
return;
}
uint64_t numOldAPs = this->getNumAPsNoLock(TransactionType::READ_ONLY);
for (uint64_t apIdx = 0; apIdx < numOldAPs; ++apIdx) {
uint64_t apPageIdx = this->getAPPageIdxNoLock(apIdx, TransactionType::READ_ONLY);
if (ku_dynamic_cast<FileHandle&, BMFileHandle&>(this->fileHandle)
.hasWALPageVersionNoWALPageIdxLock(apPageIdx)) {
// Note we can directly read the new image from disk because the WALReplayer checkpoints
// the disk image of the page before calling
// InMemDiskArray::checkpointInMemoryIfNecessary.
if (isCheckpoint) {
this->readArrayPageFromFile(apIdx, apPageIdx);
}
this->clearWALPageVersionAndRemovePageFromFrameIfNecessary(apPageIdx);
}
}
uint64_t newNumAPs = this->getNumAPsNoLock(TransactionType::WRITE);
// When rolling back, unlike removing new PIPs in
// BaseDiskArray::checkpointOrRollbackInMemoryIfNecessaryNoLock when rolling back, we cannot
// directly truncate each page. Instead we need to keep track of the minimum apPageIdx we
// saw that we want to truncate to first. Then we call
// BaseDiskArray::checkpointOrRollbackInMemoryIfNecessaryNoLock, which can do its own
// truncation due to newly added PIPs. Then finally we truncate. The reason is this: suppose
// we added a new apIdx=1 with pageIdx 20 in the fileHandle, which suppose caused a new PIP
// to be inserted with pageIdx 21, and we further added one more new apIdx=2 with
// pageIdx 22. Now this function will loop through the newly added apIdxs, so apIdx=1 and 2
// in that order. If we directly truncate to the pageIdx of apIdx=1, which is 20, then we
// will remove 21 and 22. But then we will loop through apIdx=2 and we need to convert it to
// its pageIdx to clear its updated WAL version. But that requires reading the newly added
// PIP's WAL version, which had pageIdx 22 but no longer exists. That would lead to a seg
// fault somewhere. So we do not truncate these in order not to accidentally remove newly
// added PIPs, which we would need if we kept calling removePageIdxAndTruncateIfNecessary
// for each newly added array pages.
page_idx_t minNewAPPageIdxToTruncateTo = INVALID_PAGE_IDX;
for (uint64_t apIdx = this->header.numAPs; apIdx < newNumAPs; apIdx++) {
page_idx_t apPageIdx = this->getAPPageIdxNoLock(apIdx, TransactionType::WRITE);
if (isCheckpoint) {
this->addInMemoryArrayPageAndReadFromFile(apPageIdx);
}
this->clearWALPageVersionAndRemovePageFromFrameIfNecessary(apPageIdx);
if (!isCheckpoint) {
minNewAPPageIdxToTruncateTo = std::min(minNewAPPageIdxToTruncateTo, apPageIdx);
}
}

// TODO(Semih): Currently we do not support truncating DiskArrays. When we support that, we
// need to implement the logic to truncate InMemArrayPages as well.
// Note that the base class call sets hasTransactionalUpdates to false.
if (isCheckpoint) {
BaseDiskArrayInternal::checkpointOrRollbackInMemoryIfNecessaryNoLock(
true /* is checkpoint */);
} else {
BaseDiskArrayInternal::checkpointOrRollbackInMemoryIfNecessaryNoLock(
false /* is rollback */);
((BMFileHandle&)this->fileHandle)
.removePageIdxAndTruncateIfNecessary(minNewAPPageIdxToTruncateTo);
}
}

InMemDiskArrayBuilderInternal::InMemDiskArrayBuilderInternal(FileHandle& fileHandle,
page_idx_t headerPageIdx, uint64_t numElements, size_t elementSize, bool setToZero)
: BaseInMemDiskArray(fileHandle, headerPageIdx, elementSize) {
Expand Down

0 comments on commit c390949

Please sign in to comment.