Skip to content

Commit

Permalink
add locks for wal page idx
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 21, 2023
1 parent b5805d6 commit c50479f
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 224 deletions.
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
using page_offset_t = uint32_t;
constexpr page_idx_t PAGE_IDX_MAX = UINT32_MAX;
constexpr page_idx_t INVALID_PAGE_IDX = UINT32_MAX;
using page_group_idx_t = uint32_t;
using frame_group_idx_t = page_group_idx_t;
using list_header_t = uint32_t;
Expand Down
75 changes: 57 additions & 18 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,42 @@ class PageState {
std::atomic<uint64_t> evictionTimestamp = 0;
};

// This class is used to keep the WAL page idxes of a page group in the original file handle.
// For each original page in the page group, it has a corresponding WAL page idx. The WAL page idx
// is initialized as INVALID_PAGE_IDX to indicate that the page does not have any updates in WAL
// file. To synchronize accesses to the WAL version page idxes, we use a lock for each WAL page idx.
class WALPageIdxGroup {
public:
WALPageIdxGroup();

inline void acquireWALPageIdxLock(common::page_idx_t originalPageIdxInGroup) {
walPageIdxLocks[originalPageIdxInGroup]->lock();
}
inline void releaseWALPageIdxLock(common::page_idx_t originalPageIdxInGroup) {
walPageIdxLocks[originalPageIdxInGroup]->unlock();
}
inline common::page_idx_t getWALVersionPageIdxNoLock(common::page_idx_t pageIdxInGroup) const {
return walPageIdxes[pageIdxInGroup];
}
inline void setWALVersionPageIdxNoLock(
common::page_idx_t pageIdxInGroup, common::page_idx_t walVersionPageIdx) {
walPageIdxes[pageIdxInGroup] = walVersionPageIdx;
}

private:
std::vector<common::page_idx_t> walPageIdxes;
std::vector<std::unique_ptr<std::mutex>> walPageIdxLocks;
};

// BMFileHandle is a file handle that is backed by BufferManager. It holds the state of
// each page in the file. File Handle is the bridge between a Column/Lists/Index and the Buffer
// Manager that abstracts the file in which that Column/Lists/Index is stored.
// BMFileHandle supports two types of files: versioned and non-versioned. Versioned files
// contains mapping from pages that have updates to the versioned pages in the wal file.
// Currently, only MemoryManager and WAL files are non-versioned.
class BMFileHandle : public FileHandle {
friend class BufferManager;

public:
enum class FileVersionedType : uint8_t {
VERSIONED_FILE = 0, // The file is backed by versioned pages in wal file.
Expand All @@ -62,26 +91,30 @@ class BMFileHandle : public FileHandle {
BMFileHandle(const std::string& path, uint8_t flags, BufferManager* bm,
common::PageSizeClass pageSizeClass, FileVersionedType fileVersionedType);

void createPageVersionGroupIfNecessary(common::page_idx_t pageIdx);

common::page_group_idx_t addWALPageIdxGroupIfNecessary(common::page_idx_t originalPageIdx);
// This function is intended to be used after a fileInfo is created and we want the file
// to have not pages and page locks. Should be called after ensuring that the buffer manager
// to have no pages and page locks. Should be called after ensuring that the buffer manager
// does not hold any of the pages of the file.
void resetToZeroPagesAndPageCapacity();
void removePageIdxAndTruncateIfNecessary(common::page_idx_t pageIdx);

bool hasWALPageVersionNoPageLock(common::page_idx_t pageIdx);
void clearWALPageVersionIfNecessary(common::page_idx_t pageIdx);
common::page_idx_t getWALPageVersionNoPageLock(common::page_idx_t pageIdx);
void setWALPageVersion(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);
void setWALPageVersionNoLock(common::page_idx_t pageIdx, common::page_idx_t pageVersion);
void acquireWALPageIdxLock(common::page_idx_t originalPageIdx);
void releaseWALPageIdxLock(common::page_idx_t originalPageIdx);

inline bool acquirePageLock(common::page_idx_t pageIdx, LockMode lockMode) {
return getPageState(pageIdx)->acquireLock(lockMode);
}
inline void releasePageLock(common::page_idx_t pageIdx) {
getPageState(pageIdx)->releaseLock();
}
// Return true if the original page's page group has a WAL page group.
bool hasWALPageGroup(common::page_group_idx_t originalPageIdx);

// This function assumes that the caller has already acquired the wal page idx lock.
// Return true if the page has a WAL page idx, whose value is not equal to INVALID_PAGE_IDX.
bool hasWALPageVersionNoWALPageIdxLock(common::page_idx_t originalPageIdx);

void clearWALPageIdxIfNecessary(common::page_idx_t originalPageIdx);
common::page_idx_t getWALPageIdxNoWALPageIdxLock(common::page_idx_t originalPageIdx);
void setWALPageIdx(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);
// This function assumes that the caller has already acquired the wal page idx lock.
void setWALPageIdxNoLock(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);

private:
inline PageState* getPageState(common::page_idx_t pageIdx) {
assert(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
Expand All @@ -90,6 +123,12 @@ class BMFileHandle : public FileHandle {
assert(pageIdx < numPages && pageStates[pageIdx]);
pageStates[pageIdx]->resetState();
}
inline bool acquirePageLock(common::page_idx_t pageIdx, LockMode lockMode) {
return getPageState(pageIdx)->acquireLock(lockMode);
}
inline void releasePageLock(common::page_idx_t pageIdx) {
getPageState(pageIdx)->releaseLock();
}
inline common::frame_idx_t getFrameIdx(common::page_idx_t pageIdx) {
assert(pageIdx < pageCapacity);
return (frameGroupIdxes[pageIdx >> common::StorageConstants::PAGE_GROUP_SIZE_LOG2]
Expand All @@ -98,11 +137,9 @@ class BMFileHandle : public FileHandle {
}
inline common::PageSizeClass getPageSizeClass() const { return pageSizeClass; }

private:
void initPageStatesAndGroups();
common::page_idx_t addNewPageWithoutLock() override;
void addNewPageGroupWithoutLock();
void removePageIdxAndTruncateIfNecessaryWithoutLock(common::page_idx_t pageIdxToRemove);
inline common::page_group_idx_t getNumPageGroups() {
return ceil((double)numPages / common::StorageConstants::PAGE_GROUP_SIZE);
}
Expand All @@ -114,8 +151,10 @@ class BMFileHandle : public FileHandle {
std::vector<std::unique_ptr<PageState>> pageStates;
// Each file page group corresponds to a frame group in the VMRegion.
std::vector<common::page_group_idx_t> frameGroupIdxes;
std::vector<std::vector<common::page_idx_t>> pageVersions;
std::vector<std::unique_ptr<std::atomic_flag>> pageGroupLocks;
// For each page group, if it has any WAL page version, we keep a `WALPageIdxGroup` in this map.
// `WALPageIdxGroup` records the WAL page idx for each page in the page group.
// Accesses to this map is synchronized by `fhSharedMutex`.
std::unordered_map<common::page_group_idx_t, std::unique_ptr<WALPageIdxGroup>> walPageIdxGroups;
};
} // namespace storage
} // namespace kuzu
3 changes: 0 additions & 3 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ class BufferManager {

uint8_t* pin(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy = PageReadPolicy::READ_PAGE);
uint8_t* pinWithoutAcquiringPageLock(
BMFileHandle& fileHandle, common::page_idx_t pageIdx, PageReadPolicy pageReadPolicy);

void setPinnedPageDirty(BMFileHandle& fileHandle, common::page_idx_t pageIdx);

// The function assumes that the requested page is already pinned.
void unpin(BMFileHandle& fileHandle, common::page_idx_t pageIdx);
void unpinWithoutAcquiringPageLock(BMFileHandle& fileHandle, common::page_idx_t pageIdx);

void removeFilePagesFromFrames(BMFileHandle& fileHandle);
void flushAllDirtyPagesInFrames(BMFileHandle& fileHandle);
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ class Column : public BaseColumnOrList {

void writeValues(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom);

// Currently, used only in CopyCSV tests.
virtual common::Value readValue(common::offset_t offset);
bool isNull(common::offset_t nodeOffset, transaction::Transaction* transaction);
void setNodeOffsetToNull(common::offset_t nodeOffset);

// Currently, used only in CopyCSV tests.
virtual common::Value readValueForTestingOnly(common::offset_t offset);

protected:
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector, uint32_t vectorPos);
Expand Down Expand Up @@ -107,7 +108,7 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override;

// Currently, used only in CopyCSV tests.
common::Value readValue(common::offset_t offset) override;
common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
Expand Down Expand Up @@ -141,7 +142,7 @@ class ListPropertyColumn : public PropertyColumnWithOverflow {
void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override;

common::Value readValue(common::offset_t offset) override;
common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct WALPageIdxPosInPageAndFrame : WALPageIdxAndFrame {

class StorageStructureUtils {
public:
constexpr static common::page_idx_t NULL_PAGE_IDX = common::PAGE_IDX_MAX;
constexpr static common::page_idx_t NULL_PAGE_IDX = common::INVALID_PAGE_IDX;
constexpr static uint32_t NULL_CHUNK_OR_LARGE_LIST_HEAD_IDX = UINT32_MAX;

public:
Expand Down
Loading

0 comments on commit c50479f

Please sign in to comment.