Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a lockfree data structure to store page states #3425

Merged
merged 2 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ add_subdirectory(third_party)
if(${BUILD_KUZU})
add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}")
add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}")
add_definitions(-DKUZU_EXTENSION_VERSION="0.3.0")
add_definitions(-DKUZU_EXTENSION_VERSION="0.3.1")

include_directories(src/include)

Expand Down
108 changes: 108 additions & 0 deletions src/include/common/concurrent_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#pragma once

#include <array>
#include <cstdint>
#include <memory>
#include <vector>

#include "common/assert.h"
#include "common/constants.h"

namespace kuzu {
namespace common {

template<typename T, uint64_t BLOCK_SIZE = DEFAULT_VECTOR_CAPACITY,
uint64_t INDEX_SIZE = BLOCK_SIZE>
// Vector which doesn't move when resizing
// The initial size is fixed, and new elements are added in fixed sized blocks which are indexed and
// the indices are chained in a linked list. Currently only one thread can write concurrently, but
// any number of threads can read, even when the vector is being written to.
//
// Accessing elements which existed when the vector was created is as fast as possible, requiring
// just one comparison and one pointer reads, and accessing new elements is still reasonably fast,
// usually requiring reading just two pointers, with a small amount of arithmetic, or maybe more if
// an extremely large number of elements has been added
// (access cost increases every BLOCK_SIZE * INDEX_SIZE elements).
class ConcurrentVector {
public:
explicit ConcurrentVector(uint64_t initialNumElements, uint64_t initialBlockSize)
: numElements{initialNumElements}, initialBlock{std::make_unique<T[]>(initialBlockSize)},
initialBlockSize{initialBlockSize}, firstIndex{nullptr} {}
// resize never deallocates memory
// Not thread-safe
// It could be made to be thread-safe by storing the size atomically and doing compare and swap
// when adding new indices and blocks
void resize(uint64_t newSize) {
while (newSize > initialBlockSize + blocks.size() * BLOCK_SIZE) {
auto newBlock = std::make_unique<Block>();
if (indices.empty()) {
auto index = std::make_unique<BlockIndex>();
index->blocks[0] = newBlock.get();
index->numBlocks = 1;
firstIndex = index.get();
indices.push_back(std::move(index));
} else if (indices.back()->numBlocks < INDEX_SIZE) {
auto& index = indices.back();
index->blocks[index->numBlocks] = newBlock.get();
index->numBlocks++;
} else {
KU_ASSERT(indices.back()->numBlocks == INDEX_SIZE);
auto index = std::make_unique<BlockIndex>();
index->blocks[0] = newBlock.get();
index->numBlocks = 1;
indices.back()->nextIndex = index.get();
indices.push_back(std::move(index));
}
blocks.push_back(std::move(newBlock));
}
numElements = newSize;
}

void push_back(T&& value) {
auto index = numElements;
resize(numElements + 1);
(*this)[index] = std::move(value);
}

T& operator[](uint64_t elemPos) {
if (elemPos < initialBlockSize) {
KU_ASSERT(initialBlock);
return initialBlock[elemPos];
} else {
auto blockNum = (elemPos - initialBlockSize) / BLOCK_SIZE;
auto posInBlock = (elemPos - initialBlockSize) % BLOCK_SIZE;
auto indexNum = blockNum / INDEX_SIZE;
BlockIndex* index = firstIndex;
KU_ASSERT(index != nullptr);
while (indexNum > 0) {
KU_ASSERT(index->nextIndex != nullptr);
index = index->nextIndex;
indexNum--;
}
KU_ASSERT(index->blocks[blockNum % INDEX_SIZE] != nullptr);
return index->blocks[blockNum % INDEX_SIZE]->data[posInBlock];
}
}

uint64_t size() { return numElements; }

private:
uint64_t numElements;
std::unique_ptr<T[]> initialBlock;
uint64_t initialBlockSize;
struct Block {
std::array<T, BLOCK_SIZE> data;
};
struct BlockIndex {
BlockIndex() : nextIndex{nullptr}, blocks{}, numBlocks{0} {}
BlockIndex* nextIndex;
std::array<Block*, INDEX_SIZE> blocks;
uint64_t numBlocks;
};
BlockIndex* firstIndex;
std::vector<std::unique_ptr<Block>> blocks;
std::vector<std::unique_ptr<BlockIndex>> indices;
};

} // namespace common
} // namespace kuzu
23 changes: 16 additions & 7 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

#include <atomic>
#include <cmath>
#include <cstdint>

#include "common/concurrent_vector.h"
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/vm_region.h"
#include "storage/file_handle.h"

Expand Down Expand Up @@ -150,7 +154,7 @@ class BMFileHandle : public FileHandle {
// This function assumes the page is already LOCKED.
inline void setLockedPageDirty(common::page_idx_t pageIdx) {
KU_ASSERT(pageIdx < numPages);
pageStates[pageIdx]->setDirty();
pageStates[pageIdx].setDirty();
}

common::page_group_idx_t addWALPageIdxGroupIfNecessary(common::page_idx_t originalPageIdx);
Expand Down Expand Up @@ -178,9 +182,8 @@ class BMFileHandle : public FileHandle {

private:
inline PageState* getPageState(common::page_idx_t pageIdx) {
std::shared_lock sLck{fhSharedMutex};
KU_ASSERT(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
KU_ASSERT(pageIdx < numPages);
return &pageStates[pageIdx];
}
inline common::frame_idx_t getFrameIdx(common::page_idx_t pageIdx) {
KU_ASSERT(pageIdx < pageCapacity);
Expand All @@ -190,7 +193,6 @@ class BMFileHandle : public FileHandle {
}
inline common::PageSizeClass getPageSizeClass() const { return pageSizeClass; }

void initPageStatesAndGroups();
common::page_idx_t addNewPageWithoutLock() override;
void addNewPageGroupWithoutLock();
inline common::page_group_idx_t getNumPageGroups() {
Expand All @@ -201,9 +203,16 @@ class BMFileHandle : public FileHandle {
FileVersionedType fileVersionedType;
BufferManager* bm;
common::PageSizeClass pageSizeClass;
std::vector<std::unique_ptr<PageState>> pageStates;
// With a page group size of 2^10 and an 256KB index size, the access cost increases
// only with each 128GB added to the file
common::ConcurrentVector<PageState, common::StorageConstants::PAGE_GROUP_SIZE,
common::BufferPoolConstants::PAGE_256KB_SIZE / sizeof(void*)>
pageStates;
// Each file page group corresponds to a frame group in the VMRegion.
std::vector<common::page_group_idx_t> frameGroupIdxes;
// Just one frame group for each page group, so performance is less sensitive than pageStates
// and left at the default which won't increase access cost for the frame groups until 16TB of
// data has been written
common::ConcurrentVector<common::page_group_idx_t> frameGroupIdxes;
// For each page group, if it has any WAL page version, we keep a `WALPageIdxGroup` in this map.
// `WALPageIdxGroup` records the WAL page idx for each page in the page group.
// Accesses to this map is synchronized by `fhSharedMutex`.
Expand Down
21 changes: 6 additions & 15 deletions src/storage/buffer_manager/bm_file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,22 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager
PageSizeClass pageSizeClass, FileVersionedType fileVersionedType,
common::VirtualFileSystem* vfs)
: FileHandle{path, flags, vfs}, fileVersionedType{fileVersionedType}, bm{bm},
pageSizeClass{pageSizeClass} {
initPageStatesAndGroups();
pageSizeClass{pageSizeClass}, pageStates{numPages, pageCapacity},
frameGroupIdxes{getNumPageGroups(), getNumPageGroups()} {
for (auto i = 0u; i < frameGroupIdxes.size(); i++) {
frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass);
}
}

BMFileHandle::~BMFileHandle() {
bm->removeFilePagesFromFrames(*this);
}

void BMFileHandle::initPageStatesAndGroups() {
pageStates.resize(pageCapacity);
for (auto i = 0ull; i < numPages; i++) {
pageStates[i] = std::make_unique<PageState>();
}
auto numPageGroups = getNumPageGroups();
frameGroupIdxes.resize(numPageGroups);
for (auto i = 0u; i < numPageGroups; i++) {
frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass);
}
}

page_idx_t BMFileHandle::addNewPageWithoutLock() {
if (numPages == pageCapacity) {
addNewPageGroupWithoutLock();
}
pageStates[numPages] = std::make_unique<PageState>();
pageStates[numPages].resetToEvicted();
return numPages++;
}

Expand Down
Loading