Skip to content

Commit

Permalink
Use a lockfree data structure to store page states
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed May 1, 2024
1 parent 88cc154 commit b79aeb3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 22 deletions.
106 changes: 106 additions & 0 deletions src/include/common/concurrent_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <array>
#include <cstdint>
#include <memory>
#include <vector>

#include "common/assert.h"
#include "common/constants.h"

namespace kuzu {
namespace common {

template<typename T, uint64_t BLOCK_SIZE = DEFAULT_VECTOR_CAPACITY,
uint64_t INDEX_SIZE = BLOCK_SIZE>
// Vector which doesn't move when resizing
// The initial size is fixed, and new elements are added in fixed sized blocks which indexed and the
// indices are chained in a linked list. Currently only one thread can write concurrently, but any
// number of threads can read, even when the vector is being written to.
//
// Accessing elements which existed when the vector was created is as fast as possible, requiring
// just one comparison and one pointer reads, and accessing new elements is still reasonably fast,
// usually requiring reading just two pointers, with a small amount of arithmetic, or maybe more if
// an extremely large number of elements has been added
// (access cost increases every BLOCK_SIZE * INDEX_SIZE elements).
class ConcurrentVector {
public:
explicit ConcurrentVector(uint64_t initialNumElements, uint64_t initialBlockSize)
: numElements{initialNumElements}, initialBlock{std::make_unique<T[]>(initialBlockSize)},
initialBlockSize{initialBlockSize}, firstIndex{nullptr} {}
// resize never deallocates memory
// Not thread-safe
// It could be made to be thread-safe by storing the size atomically and doing compare and swap
// when adding new indices and blocks
void resize(uint64_t newSize) {
while (newSize > initialBlockSize + blocks.size() * BLOCK_SIZE) {
auto newBlock = std::make_unique<Block>();
if (indices.empty()) {
auto index = std::make_unique<BlockIndex>();
index->blocks[0] = newBlock.get();
index->numBlocks = 1;
firstIndex = index.get();
indices.push_back(std::move(index));
} else if (indices.back()->numBlocks < INDEX_SIZE) {
auto& index = indices.back();
index->blocks[index->numBlocks] = newBlock.get();
index->numBlocks++;
} else {
KU_ASSERT(indices.back()->numBlocks == INDEX_SIZE);
auto index = std::make_unique<BlockIndex>();
index->blocks[0] = newBlock.get();
index->numBlocks = 1;
indices.back()->nextIndex = index.get();
indices.push_back(std::move(index));
}
blocks.push_back(std::move(newBlock));
}
numElements = newSize;
}

void push_back(T&& value) {
auto index = numElements;
resize(numElements + 1);
(*this)[index] = std::move(value);
}

T& operator[](uint64_t elemPos) {
if (elemPos < initialBlockSize) {
return initialBlock[elemPos];
} else {
auto blockNum = (elemPos - initialBlockSize) / BLOCK_SIZE;
auto posInBlock = (elemPos - initialBlockSize) % BLOCK_SIZE;
auto indexNum = blockNum / INDEX_SIZE;
BlockIndex* index = firstIndex;
KU_ASSERT(index != nullptr);
while (indexNum > 0) {
KU_ASSERT(index->nextIndex != nullptr);
index = index->nextIndex;
indexNum--;
}
return index->blocks[blockNum % INDEX_SIZE]->data[posInBlock];
}
}

uint64_t size() { return numElements; }

private:
uint64_t numElements;
std::unique_ptr<T[]> initialBlock;
uint64_t initialBlockSize;
struct Block {
std::array<T, BLOCK_SIZE> data;
};
struct BlockIndex {
BlockIndex() : nextIndex{nullptr}, blocks{}, numBlocks{0} {}
BlockIndex* nextIndex;
std::array<Block*, INDEX_SIZE> blocks;
uint64_t numBlocks;
};
BlockIndex* firstIndex;
std::vector<std::unique_ptr<Block>> blocks;
std::vector<std::unique_ptr<BlockIndex>> indices;
};

} // namespace common
} // namespace kuzu
23 changes: 16 additions & 7 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

#include <atomic>
#include <cmath>
#include <cstdint>

#include "common/concurrent_vector.h"
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/vm_region.h"
#include "storage/file_handle.h"

Expand Down Expand Up @@ -147,7 +151,7 @@ class BMFileHandle : public FileHandle {
// This function assumes the page is already LOCKED.
inline void setLockedPageDirty(common::page_idx_t pageIdx) {
KU_ASSERT(pageIdx < numPages);
pageStates[pageIdx]->setDirty();
pageStates[pageIdx].setDirty();
}

common::page_group_idx_t addWALPageIdxGroupIfNecessary(common::page_idx_t originalPageIdx);
Expand Down Expand Up @@ -175,9 +179,8 @@ class BMFileHandle : public FileHandle {

private:
inline PageState* getPageState(common::page_idx_t pageIdx) {
std::shared_lock sLck{fhSharedMutex};
KU_ASSERT(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
KU_ASSERT(pageIdx < numPages);
return &pageStates[pageIdx];
}
inline common::frame_idx_t getFrameIdx(common::page_idx_t pageIdx) {
KU_ASSERT(pageIdx < pageCapacity);
Expand All @@ -187,7 +190,6 @@ class BMFileHandle : public FileHandle {
}
inline common::PageSizeClass getPageSizeClass() const { return pageSizeClass; }

void initPageStatesAndGroups();
common::page_idx_t addNewPageWithoutLock() override;
void addNewPageGroupWithoutLock();
inline common::page_group_idx_t getNumPageGroups() {
Expand All @@ -198,9 +200,16 @@ class BMFileHandle : public FileHandle {
FileVersionedType fileVersionedType;
BufferManager* bm;
common::PageSizeClass pageSizeClass;
std::vector<std::unique_ptr<PageState>> pageStates;
// With a page group size of 2^10 and an 256KB index size, the access cost increases
// only with each 128GB added to the file
common::ConcurrentVector<PageState, common::StorageConstants::PAGE_GROUP_SIZE,
common::BufferPoolConstants::PAGE_256KB_SIZE / sizeof(void*)>
pageStates;
// Each file page group corresponds to a frame group in the VMRegion.
std::vector<common::page_group_idx_t> frameGroupIdxes;
// Just one frame group for each page group, so performance is less sensitive than pageStates
// and left at the default which won't increase access cost for the frame groups until 16TB of
// data has been written
common::ConcurrentVector<common::page_group_idx_t> frameGroupIdxes;
// For each page group, if it has any WAL page version, we keep a `WALPageIdxGroup` in this map.
// `WALPageIdxGroup` records the WAL page idx for each page in the page group.
// Accesses to this map is synchronized by `fhSharedMutex`.
Expand Down
21 changes: 6 additions & 15 deletions src/storage/buffer_manager/bm_file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,22 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager
PageSizeClass pageSizeClass, FileVersionedType fileVersionedType,
common::VirtualFileSystem* vfs)
: FileHandle{path, flags, vfs}, fileVersionedType{fileVersionedType}, bm{bm},
pageSizeClass{pageSizeClass} {
initPageStatesAndGroups();
pageSizeClass{pageSizeClass}, pageStates{numPages, pageCapacity},
frameGroupIdxes{getNumPageGroups(), getNumPageGroups()} {
for (auto i = 0u; i < frameGroupIdxes.size(); i++) {
frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass);
}
}

BMFileHandle::~BMFileHandle() {
bm->removeFilePagesFromFrames(*this);
}

void BMFileHandle::initPageStatesAndGroups() {
pageStates.resize(pageCapacity);
for (auto i = 0ull; i < numPages; i++) {
pageStates[i] = std::make_unique<PageState>();
}
auto numPageGroups = getNumPageGroups();
frameGroupIdxes.resize(numPageGroups);
for (auto i = 0u; i < numPageGroups; i++) {
frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass);
}
}

page_idx_t BMFileHandle::addNewPageWithoutLock() {
if (numPages == pageCapacity) {
addNewPageGroupWithoutLock();
}
pageStates[numPages] = std::make_unique<PageState>();
pageStates[numPages].resetToEvicted();
return numPages++;
}

Expand Down

0 comments on commit b79aeb3

Please sign in to comment.