diff --git a/CMakeLists.txt b/CMakeLists.txt index f6a2598f2d..4bf5ccf412 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,7 +217,7 @@ add_subdirectory(third_party) if(${BUILD_KUZU}) add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}") add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}") -add_definitions(-DKUZU_EXTENSION_VERSION="0.3.0") +add_definitions(-DKUZU_EXTENSION_VERSION="0.3.1") include_directories(src/include) diff --git a/src/include/common/concurrent_vector.h b/src/include/common/concurrent_vector.h new file mode 100644 index 0000000000..a6448b89ee --- /dev/null +++ b/src/include/common/concurrent_vector.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include +#include +#include + +#include "common/assert.h" +#include "common/constants.h" + +namespace kuzu { +namespace common { + +template +// Vector which doesn't move when resizing +// The initial size is fixed, and new elements are added in fixed sized blocks which are indexed and +// the indices are chained in a linked list. Currently only one thread can write concurrently, but +// any number of threads can read, even when the vector is being written to. +// +// Accessing elements which existed when the vector was created is as fast as possible, requiring +// just one comparison and one pointer reads, and accessing new elements is still reasonably fast, +// usually requiring reading just two pointers, with a small amount of arithmetic, or maybe more if +// an extremely large number of elements has been added +// (access cost increases every BLOCK_SIZE * INDEX_SIZE elements). +class ConcurrentVector { +public: + explicit ConcurrentVector(uint64_t initialNumElements, uint64_t initialBlockSize) + : numElements{initialNumElements}, initialBlock{std::make_unique(initialBlockSize)}, + initialBlockSize{initialBlockSize}, firstIndex{nullptr} {} + // resize never deallocates memory + // Not thread-safe + // It could be made to be thread-safe by storing the size atomically and doing compare and swap + // when adding new indices and blocks + void resize(uint64_t newSize) { + while (newSize > initialBlockSize + blocks.size() * BLOCK_SIZE) { + auto newBlock = std::make_unique(); + if (indices.empty()) { + auto index = std::make_unique(); + index->blocks[0] = newBlock.get(); + index->numBlocks = 1; + firstIndex = index.get(); + indices.push_back(std::move(index)); + } else if (indices.back()->numBlocks < INDEX_SIZE) { + auto& index = indices.back(); + index->blocks[index->numBlocks] = newBlock.get(); + index->numBlocks++; + } else { + KU_ASSERT(indices.back()->numBlocks == INDEX_SIZE); + auto index = std::make_unique(); + index->blocks[0] = newBlock.get(); + index->numBlocks = 1; + indices.back()->nextIndex = index.get(); + indices.push_back(std::move(index)); + } + blocks.push_back(std::move(newBlock)); + } + numElements = newSize; + } + + void push_back(T&& value) { + auto index = numElements; + resize(numElements + 1); + (*this)[index] = std::move(value); + } + + T& operator[](uint64_t elemPos) { + if (elemPos < initialBlockSize) { + KU_ASSERT(initialBlock); + return initialBlock[elemPos]; + } else { + auto blockNum = (elemPos - initialBlockSize) / BLOCK_SIZE; + auto posInBlock = (elemPos - initialBlockSize) % BLOCK_SIZE; + auto indexNum = blockNum / INDEX_SIZE; + BlockIndex* index = firstIndex; + KU_ASSERT(index != nullptr); + while (indexNum > 0) { + KU_ASSERT(index->nextIndex != nullptr); + index = index->nextIndex; + indexNum--; + } + KU_ASSERT(index->blocks[blockNum % INDEX_SIZE] != nullptr); + return index->blocks[blockNum % INDEX_SIZE]->data[posInBlock]; + } + } + + uint64_t size() { return numElements; } + +private: + uint64_t numElements; + std::unique_ptr initialBlock; + uint64_t initialBlockSize; + struct Block { + std::array data; + }; + struct BlockIndex { + BlockIndex() : nextIndex{nullptr}, blocks{}, numBlocks{0} {} + BlockIndex* nextIndex; + std::array blocks; + uint64_t numBlocks; + }; + BlockIndex* firstIndex; + std::vector> blocks; + std::vector> indices; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/storage/buffer_manager/bm_file_handle.h b/src/include/storage/buffer_manager/bm_file_handle.h index c13b3d64b6..f0d73fba30 100644 --- a/src/include/storage/buffer_manager/bm_file_handle.h +++ b/src/include/storage/buffer_manager/bm_file_handle.h @@ -2,7 +2,11 @@ #include #include +#include +#include "common/concurrent_vector.h" +#include "common/constants.h" +#include "common/types/types.h" #include "storage/buffer_manager/vm_region.h" #include "storage/file_handle.h" @@ -150,7 +154,7 @@ class BMFileHandle : public FileHandle { // This function assumes the page is already LOCKED. inline void setLockedPageDirty(common::page_idx_t pageIdx) { KU_ASSERT(pageIdx < numPages); - pageStates[pageIdx]->setDirty(); + pageStates[pageIdx].setDirty(); } common::page_group_idx_t addWALPageIdxGroupIfNecessary(common::page_idx_t originalPageIdx); @@ -178,9 +182,8 @@ class BMFileHandle : public FileHandle { private: inline PageState* getPageState(common::page_idx_t pageIdx) { - std::shared_lock sLck{fhSharedMutex}; - KU_ASSERT(pageIdx < numPages && pageStates[pageIdx]); - return pageStates[pageIdx].get(); + KU_ASSERT(pageIdx < numPages); + return &pageStates[pageIdx]; } inline common::frame_idx_t getFrameIdx(common::page_idx_t pageIdx) { KU_ASSERT(pageIdx < pageCapacity); @@ -190,7 +193,6 @@ class BMFileHandle : public FileHandle { } inline common::PageSizeClass getPageSizeClass() const { return pageSizeClass; } - void initPageStatesAndGroups(); common::page_idx_t addNewPageWithoutLock() override; void addNewPageGroupWithoutLock(); inline common::page_group_idx_t getNumPageGroups() { @@ -201,9 +203,16 @@ class BMFileHandle : public FileHandle { FileVersionedType fileVersionedType; BufferManager* bm; common::PageSizeClass pageSizeClass; - std::vector> pageStates; + // With a page group size of 2^10 and an 256KB index size, the access cost increases + // only with each 128GB added to the file + common::ConcurrentVector + pageStates; // Each file page group corresponds to a frame group in the VMRegion. - std::vector frameGroupIdxes; + // Just one frame group for each page group, so performance is less sensitive than pageStates + // and left at the default which won't increase access cost for the frame groups until 16TB of + // data has been written + common::ConcurrentVector frameGroupIdxes; // For each page group, if it has any WAL page version, we keep a `WALPageIdxGroup` in this map. // `WALPageIdxGroup` records the WAL page idx for each page in the page group. // Accesses to this map is synchronized by `fhSharedMutex`. diff --git a/src/storage/buffer_manager/bm_file_handle.cpp b/src/storage/buffer_manager/bm_file_handle.cpp index 6efacba2f3..f0229ab98c 100644 --- a/src/storage/buffer_manager/bm_file_handle.cpp +++ b/src/storage/buffer_manager/bm_file_handle.cpp @@ -20,31 +20,22 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager PageSizeClass pageSizeClass, FileVersionedType fileVersionedType, common::VirtualFileSystem* vfs) : FileHandle{path, flags, vfs}, fileVersionedType{fileVersionedType}, bm{bm}, - pageSizeClass{pageSizeClass} { - initPageStatesAndGroups(); + pageSizeClass{pageSizeClass}, pageStates{numPages, pageCapacity}, + frameGroupIdxes{getNumPageGroups(), getNumPageGroups()} { + for (auto i = 0u; i < frameGroupIdxes.size(); i++) { + frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass); + } } BMFileHandle::~BMFileHandle() { bm->removeFilePagesFromFrames(*this); } -void BMFileHandle::initPageStatesAndGroups() { - pageStates.resize(pageCapacity); - for (auto i = 0ull; i < numPages; i++) { - pageStates[i] = std::make_unique(); - } - auto numPageGroups = getNumPageGroups(); - frameGroupIdxes.resize(numPageGroups); - for (auto i = 0u; i < numPageGroups; i++) { - frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass); - } -} - page_idx_t BMFileHandle::addNewPageWithoutLock() { if (numPages == pageCapacity) { addNewPageGroupWithoutLock(); } - pageStates[numPages] = std::make_unique(); + pageStates[numPages].resetToEvicted(); return numPages++; }