Skip to content

Commit

Permalink
single pool for bm and mm
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 16, 2023
1 parent e459a61 commit cd1451f
Show file tree
Hide file tree
Showing 72 changed files with 5,748 additions and 618 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down Expand Up @@ -100,6 +100,7 @@ include_directories(third_party/nlohmann_json)
include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ DataType Binder::bindDataType(const std::string& dataType) {
"The number of elements in a fixed list must be greater than 0. Given: " +
std::to_string(boundType.fixedNumElementsInList) + ".");
}
if (Types::getDataTypeSize(boundType) > common::BufferPoolConstants::DEFAULT_PAGE_SIZE) {
if (Types::getDataTypeSize(boundType) > common::BufferPoolConstants::PAGE_4KB_SIZE) {
throw common::BinderException("The size of fixed list is larger than a "
"DEFAULT_PAGE_SIZE, which is not supported yet.");
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/in_mem_overflow_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ namespace kuzu {
namespace common {

uint8_t* InMemOverflowBuffer::allocateSpace(uint64_t size) {
assert(size <= BufferPoolConstants::LARGE_PAGE_SIZE);
assert(size <= BufferPoolConstants::PAGE_256KB_SIZE);
if (requireNewBlock(size)) {
allocateNewBlock();
}
auto data = currentBlock->block->data + currentBlock->currentOffset;
auto data = currentBlock->block->buffer + currentBlock->currentOffset;
currentBlock->currentOffset += size;
return data;
}

void InMemOverflowBuffer::allocateNewBlock() {
auto newBlock = make_unique<BufferBlock>(
memoryManager->allocateBlock(false /* do not initialize to zero */));
memoryManager->allocateBuffer(false /* do not initialize to zero */));
currentBlock = newBlock.get();
blocks.push_back(std::move(newBlock));
}
Expand Down
41 changes: 23 additions & 18 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,35 @@ constexpr uint64_t DEFAULT_CHECKPOINT_WAIT_TIMEOUT_FOR_TRANSACTIONS_TO_LEAVE_IN_

const std::string INTERNAL_ID_SUFFIX = "_id";

enum PageSizeClass : uint8_t {
PAGE_4KB = 12,
PAGE_256KB = 18,
};

// Currently the system supports files with 2 different pages size, which we refer to as
// DEFAULT_PAGE_SIZE and LARGE_PAGE_SIZE. Default size of the page which is the unit of read/write
// to the database files, such as to store columns or lists. For now, this value cannot be changed.
// But technically it can change from 2^12 to 2^16. 2^12 lower bound is assuming the OS page size is
// 4K. 2^16 is because currently we leave 11 fixed number of bits for relOffInPage and the maximum
// number of bytes needed for an edge is 20 bytes so 11 + log_2(20) = 15.xxx, so certainly over
// 2^16-size pages, we cannot utilize the page for storing adjacency lists.
// PAGE_4KB_SIZE and PAGE_256KB_SIZE. PAGE_4KB_SIZE is the default size of the page which is the
// unit of read/write to the database files, such as to store columns or lists. For now, this value
// cannot be changed. But technically it can change from 2^12 to 2^16. 2^12 lower bound is assuming
// the OS page size is 4K. 2^16 is because currently we leave 11 fixed number of bits for
// relOffInPage and the maximum number of bytes needed for an edge is 20 bytes so 11 + log_2(20)
// = 15.xxx, so certainly over 2^16-size pages, we cannot utilize the page for storing adjacency
// lists.
struct BufferPoolConstants {
static constexpr uint64_t DEFAULT_PAGE_SIZE_LOG_2 = 12;
static constexpr uint64_t DEFAULT_PAGE_SIZE = 1 << DEFAULT_PAGE_SIZE_LOG_2;
static constexpr uint64_t PAGE_4KB_SIZE = (std::uint64_t)1 << PageSizeClass::PAGE_4KB;
// Page size for files with large pages, e.g., temporary files that are used by operators that
// may require large amounts of memory.
static constexpr uint64_t LARGE_PAGE_SIZE_LOG_2 = 18;
static constexpr uint64_t LARGE_PAGE_SIZE = 1 << LARGE_PAGE_SIZE_LOG_2;
static constexpr uint64_t PAGE_256KB_SIZE = (std::uint64_t)1 << PageSizeClass::PAGE_256KB;
// If a user does not specify a max size for BM, we by default set the max size of BM to
// maxPhyMemSize * DEFAULT_PHY_MEM_SIZE_RATIO_FOR_BM.
static constexpr double DEFAULT_PHY_MEM_SIZE_RATIO_FOR_BM = 0.8;
static constexpr uint64_t PURGE_EVICTION_QUEUE_INTERVAL = 1024;
// The default max size for a VMRegion.
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 45; // (32TB)

static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
};

struct StorageConstants {
// The default amount of memory pre-allocated to both the default and large pages buffer pool.
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE = 1ull << 30; // (1GB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 27; // (128MB)
// The default ratio of system memory allocated to buffer pools (including default and large).
static constexpr double DEFAULT_BUFFER_POOL_RATIO = 0.8;
// The default ratio of buffer allocated to default and large pages.
static constexpr double DEFAULT_PAGES_BUFFER_RATIO = 0.75;
static constexpr double LARGE_PAGES_BUFFER_RATIO = 1.0 - DEFAULT_PAGES_BUFFER_RATIO;
static constexpr char OVERFLOW_FILE_SUFFIX[] = ".ovf";
static constexpr char COLUMN_FILE_SUFFIX[] = ".col";
static constexpr char LISTS_FILE_SUFFIX[] = ".lists";
Expand All @@ -63,6 +67,7 @@ struct StorageConstants {
// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
static constexpr uint64_t PAGE_GROUP_SIZE = (uint64_t)1 << PAGE_GROUP_SIZE_LOG2;
static constexpr uint64_t PAGE_IDX_IN_GROUP_MASK = ((uint64_t)1 << PAGE_GROUP_SIZE_LOG2) - 1;
};

struct ListsMetadataConstants {
Expand Down
22 changes: 5 additions & 17 deletions src/include/common/in_mem_overflow_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace common {

struct BufferBlock {
public:
explicit BufferBlock(std::unique_ptr<storage::MemoryBlock> block)
: size{block->size}, currentOffset{0}, block{std::move(block)} {}
explicit BufferBlock(std::unique_ptr<storage::MemoryBuffer> block)
: size{block->allocator->getPageSize()}, currentOffset{0}, block{std::move(block)} {}

public:
uint64_t size;
uint64_t currentOffset;
std::unique_ptr<storage::MemoryBlock> block;
std::unique_ptr<storage::MemoryBuffer> block;

inline void resetCurrentOffset() { currentOffset = 0; }
};
Expand All @@ -27,15 +27,6 @@ class InMemOverflowBuffer {
explicit InMemOverflowBuffer(storage::MemoryManager* memoryManager)
: memoryManager{memoryManager}, currentBlock{nullptr} {};

// The blocks used are allocated through the MemoryManager but are backed by the
// BufferManager. We need to therefore release them back by calling
// memoryManager->freeBlock.
~InMemOverflowBuffer() {
for (auto& block : blocks) {
memoryManager->freeBlock(block->block->pageIdx);
}
}

uint8_t* allocateSpace(uint64_t size);

inline void merge(InMemOverflowBuffer& other) {
Expand All @@ -54,9 +45,6 @@ class InMemOverflowBuffer {
inline void resetBuffer() {
if (!blocks.empty()) {
auto firstBlock = std::move(blocks[0]);
for (auto i = 1u; i < blocks.size(); ++i) {
memoryManager->freeBlock(blocks[i]->block->pageIdx);
}
blocks.clear();
firstBlock->resetCurrentOffset();
blocks.push_back(std::move(firstBlock));
Expand All @@ -68,10 +56,10 @@ class InMemOverflowBuffer {

private:
inline bool requireNewBlock(uint64_t sizeToAllocate) {
if (sizeToAllocate > BufferPoolConstants::LARGE_PAGE_SIZE) {
if (sizeToAllocate > BufferPoolConstants::PAGE_256KB_SIZE) {
throw RuntimeException("Require size " + std::to_string(sizeToAllocate) +
" greater than single block size " +
std::to_string(BufferPoolConstants::LARGE_PAGE_SIZE) + ".");
std::to_string(BufferPoolConstants::PAGE_256KB_SIZE) + ".");
}
return currentBlock == nullptr ||
(currentBlock->currentOffset + sizeToAllocate) > currentBlock->size;
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ namespace common {
using sel_t = uint16_t;
using hash_t = uint64_t;
using page_idx_t = uint32_t;
using frame_idx_t = page_idx_t;
using page_offset_t = uint32_t;
constexpr page_idx_t PAGE_IDX_MAX = UINT32_MAX;
using page_group_idx_t = uint32_t;
using frame_group_idx_t = page_group_idx_t;
using list_header_t = uint32_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
Expand Down
1 change: 1 addition & 0 deletions src/include/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <vector>

#include "common/constants.h"
#include "common/types/types.h"
#include "exception.h"
#include "spdlog/fmt/fmt.h"

Expand Down
6 changes: 1 addition & 5 deletions src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@ KUZU_API struct SystemConfig {
/**
* @brief Creates a SystemConfig object.
* @param bufferPoolSize Buffer pool size in bytes.
* @note Currently, we have two internal buffer pools with different frame size of 4KB and
* 256KB. When a user sets a customized buffer pool size, it is divided into two internal pools
* based on the DEFAULT_PAGES_BUFFER_RATIO and LARGE_PAGES_BUFFER_RATIO.
*/
explicit SystemConfig(uint64_t bufferPoolSize);

uint64_t defaultPageBufferPoolSize;
uint64_t largePageBufferPoolSize;
uint64_t bufferPoolSize;
uint64_t maxNumThreads;
};

Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,19 @@ class DataBlock {
public:
explicit DataBlock(storage::MemoryManager* memoryManager)
: numTuples{0}, memoryManager{memoryManager} {
block = memoryManager->allocateBlock(true);
freeSize = block->size;
block = memoryManager->allocateBuffer(true /* initializeToZero */);
freeSize = block->allocator->getPageSize();
}

DataBlock(DataBlock&& other) = default;

~DataBlock() { memoryManager->freeBlock(block->pageIdx); }

inline uint8_t* getData() const { return block->data; }
inline uint8_t* getData() const { return block->buffer; }
inline void resetNumTuplesAndFreeSize() {
freeSize = common::BufferPoolConstants::LARGE_PAGE_SIZE;
freeSize = common::BufferPoolConstants::PAGE_256KB_SIZE;
numTuples = 0;
}
inline void resetToZero() {
memset(block->data, 0, common::BufferPoolConstants::LARGE_PAGE_SIZE);
memset(block->buffer, 0, common::BufferPoolConstants::PAGE_256KB_SIZE);
}

static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
Expand All @@ -60,7 +58,7 @@ class DataBlock {
storage::MemoryManager* memoryManager;

private:
std::unique_ptr<storage::MemoryBlock> block;
std::unique_ptr<storage::MemoryBuffer> block;
};

class DataBlockCollection {
Expand Down
128 changes: 128 additions & 0 deletions src/include/storage/buffer_manager/bm_managed_file_handle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#pragma once

#include "storage/buffer_manager/vm_region.h"
#include "storage/file_handle.h"

namespace kuzu {
namespace storage {

static constexpr uint64_t LOADED_MASK = 0x8000000000000000;
static constexpr uint64_t DIRTY_MASK = 0x4000000000000000;
static constexpr uint64_t PAGE_IDX_MASK = 0x3FFFFFFFFFFFFFFF;

enum class LockMode : uint8_t { SPIN = 0, NON_BLOCKING = 1 };

class BMFileHandle;

// Keeps the state information of a page in a file.
class PageState {
public:
inline bool isCached() const { return pageIdx & LOADED_MASK; }
inline void setDirty() { pageIdx |= DIRTY_MASK; }
inline void clearDirty() { pageIdx &= ~DIRTY_MASK; }
inline bool isDirty() const { return pageIdx & DIRTY_MASK; }
inline BMFileHandle* getFileHandle() const { return fileHandle; }
inline common::page_idx_t getPageIdx() const {
return (common::page_idx_t)(pageIdx & PAGE_IDX_MASK);
}
inline uint64_t incrementPinCount() { return pinCount.fetch_add(1); }
inline uint64_t decrementPinCount() { return pinCount.fetch_sub(1); }
inline void setPinCount(uint64_t newPinCount) { pinCount.store(newPinCount); }
inline uint64_t getPinCount() const { return pinCount.load(); }
inline uint64_t getEvictionTimestamp() const { return evictionTimestamp.load(); }
inline uint64_t incrementEvictionTimestamp() { return evictionTimestamp.fetch_add(1); }
inline void releaseLock() { lock.clear(); }

bool acquireLock(LockMode lockMode);
void setCached(BMFileHandle* fileHandle_, common::page_idx_t pageIdx_);
void resetState();

private:
std::atomic_flag lock = ATOMIC_FLAG_INIT;
// Highest 1st bit indicates if this page is loaded or not, 2nd bit indicates if this
// page is dirty or not. The rest 62 bits records the page idx inside the file.
uint64_t pageIdx = 0;
BMFileHandle* fileHandle;
std::atomic<uint32_t> pinCount = 0;
std::atomic<uint64_t> evictionTimestamp = 0;
};

// BMFileHandle is a file handle that is backed by BufferManager. It holds the state of
// each page in the file. File Handle is the bridge between a Column/Lists/Index and the Buffer
// Manager that abstracts the file in which that Column/Lists/Index is stored.
// BMFileHandle supports two types of files: versioned and non-versioned. Versioned files
// contains mapping from pages that have updates to the versioned pages in the wal file.
// Currently, only MemoryManager and WAL files are non-versioned.
class BMFileHandle : public FileHandle {
public:
enum class FileVersionedType : uint8_t {
VERSIONED_FILE = 0, // The file is backed by versioned pages in wal file.
NON_VERSIONED_FILE = 1 // The file does not have any versioned pages in wal file.
};

BMFileHandle(const std::string& path, uint8_t flags, FileVersionedType fileVersionedType,
VMRegion* bufferRegion);

void createPageVersionGroupIfNecessary(common::page_idx_t pageIdx);

// This function is intended to be used after a fileInfo is created and we want the file
// to have not pages and page locks. Should be called after ensuring that the buffer manager
// does not hold any of the pages of the file.
void resetToZeroPagesAndPageCapacity();
void removePageIdxAndTruncateIfNecessary(common::page_idx_t pageIdx);

bool hasWALPageVersionNoPageLock(common::page_idx_t pageIdx);
void clearWALPageVersionIfNecessary(common::page_idx_t pageIdx);
common::page_idx_t getWALPageVersionNoPageLock(common::page_idx_t pageIdx);
void setWALPageVersion(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);
void setWALPageVersionNoLock(common::page_idx_t pageIdx, common::page_idx_t pageVersion);

inline bool acquirePageLock(common::page_idx_t pageIdx, LockMode lockMode) {
return getPageState(pageIdx)->acquireLock(lockMode);
}
inline void releasePageLock(common::page_idx_t pageIdx) {
getPageState(pageIdx)->releaseLock();
}
inline PageState* getPageState(common::page_idx_t pageIdx) {
assert(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
}
inline void clearPageState(common::page_idx_t pageIdx) {
assert(pageIdx < numPages && pageStates[pageIdx]);
pageStates[pageIdx]->resetState();
}
inline uint8_t* getFrame(common::page_idx_t pageIdx) {
auto frameIdx = getFrameIdx(pageIdx);
return vmRegion->getFrame(frameIdx);
}
inline void releaseFrame(common::page_idx_t pageIdx) {
auto frameIdx = getFrameIdx(pageIdx);
vmRegion->releaseFrame(frameIdx);
}

private:
void initPageStatesAndGroups();
common::page_idx_t addNewPageWithoutLock() override;
void addNewPageGroupWithoutLock();
void removePageIdxAndTruncateIfNecessaryWithoutLock(common::page_idx_t pageIdxToRemove);
inline common::page_group_idx_t getNumPageGroups() {
return ceil((double)numPages / common::StorageConstants::PAGE_GROUP_SIZE);
}
inline common::frame_idx_t getFrameIdx(common::page_idx_t pageIdx) {
assert(pageIdx < pageCapacity);
return (frameGroupIdxes[pageIdx >> common::StorageConstants::PAGE_GROUP_SIZE_LOG2]
<< common::StorageConstants::PAGE_GROUP_SIZE_LOG2) +
(pageIdx & common::StorageConstants::PAGE_IDX_IN_GROUP_MASK);
}

private:
FileVersionedType fileVersionedType;
VMRegion* vmRegion;
std::vector<std::unique_ptr<PageState>> pageStates;
// Each file page group corresponds to a frame group in the VMRegion.
std::vector<common::page_group_idx_t> frameGroupIdxes;
std::vector<std::vector<common::page_idx_t>> pageVersions;
std::vector<std::unique_ptr<std::atomic_flag>> pageGroupLocks;
};
} // namespace storage
} // namespace kuzu
Loading

0 comments on commit cd1451f

Please sign in to comment.