Skip to content

Commit

Permalink
single pool for bm and mm
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 13, 2023
1 parent 1a4ae3d commit 94d4c22
Show file tree
Hide file tree
Showing 64 changed files with 5,559 additions and 908 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.4 LANGUAGES CXX)
project(Kuzu VERSION 0.0.5 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down Expand Up @@ -95,6 +95,7 @@ include_directories(third_party/nlohmann_json)
include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
2 changes: 1 addition & 1 deletion benchmark/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def serialize(dataset_name, dataset_path, serialized_graph_path):
try:
# Run kuzu shell one query at a time. This ensures a new process is
# created for each query to avoid memory leaks.
subprocess.run([kuzu_exec_path, '-i', serialized_graph_path],
subprocess.run([kuzu_exec_path, serialized_graph_path],
input=(s + ";" + "\n").encode("ascii"), check=True)
except subprocess.CalledProcessError as e:
logging.error('Error executing query: %s', s)
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ DataType Binder::bindDataType(const std::string& dataType) {
"The number of elements in a fixed list must be greater than 0. Given: " +
std::to_string(boundType.fixedNumElementsInList) + ".");
}
if (Types::getDataTypeSize(boundType) > common::BufferPoolConstants::DEFAULT_PAGE_SIZE) {
if (Types::getDataTypeSize(boundType) > common::BufferPoolConstants::PAGE_4KB_SIZE) {
throw common::BinderException("The size of fixed list is larger than a "
"DEFAULT_PAGE_SIZE, which is not supported yet.");
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/in_mem_overflow_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace kuzu {
namespace common {

uint8_t* InMemOverflowBuffer::allocateSpace(uint64_t size) {
assert(size <= BufferPoolConstants::LARGE_PAGE_SIZE);
assert(size <= BufferPoolConstants::PAGE_256KB_SIZE);
if (requireNewBlock(size)) {
allocateNewBlock();
}
auto data = currentBlock->block->data + currentBlock->currentOffset;
auto data = currentBlock->block->buffer + currentBlock->currentOffset;
currentBlock->currentOffset += size;
return data;
}
Expand Down
25 changes: 13 additions & 12 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ constexpr uint64_t DEFAULT_CHECKPOINT_WAIT_TIMEOUT_FOR_TRANSACTIONS_TO_LEAVE_IN_

const std::string INTERNAL_ID_SUFFIX = "_id";

enum PageSizeClass : uint8_t {
PAGE_4KB = 12,
PAGE_256KB = 18,
};

// Currently the system supports files with 2 different pages size, which we refer to as
// DEFAULT_PAGE_SIZE and LARGE_PAGE_SIZE. Default size of the page which is the unit of read/write
// to the database files, such as to store columns or lists. For now, this value cannot be changed.
Expand All @@ -29,23 +34,18 @@ const std::string INTERNAL_ID_SUFFIX = "_id";
// number of bytes needed for an edge is 20 bytes so 11 + log_2(20) = 15.xxx, so certainly over
// 2^16-size pages, we cannot utilize the page for storing adjacency lists.
struct BufferPoolConstants {
static constexpr uint64_t DEFAULT_PAGE_SIZE_LOG_2 = 12;
static constexpr uint64_t DEFAULT_PAGE_SIZE = 1 << DEFAULT_PAGE_SIZE_LOG_2;
static constexpr uint64_t PAGE_4KB_SIZE = (std::uint64_t)1 << PageSizeClass::PAGE_4KB;
// Page size for files with large pages, e.g., temporary files that are used by operators that
// may require large amounts of memory.
static constexpr uint64_t LARGE_PAGE_SIZE_LOG_2 = 18;
static constexpr uint64_t LARGE_PAGE_SIZE = 1 << LARGE_PAGE_SIZE_LOG_2;
static constexpr uint64_t PAGE_256KB_SIZE = (std::uint64_t)1 << PageSizeClass::PAGE_256KB;
static constexpr double DEFAULT_BUFFER_POOL_RATIO = 0.8;

static constexpr uint64_t PURGE_EVICTION_QUEUE_INTERVAL = 1024;
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_MMAP_MAX_SIZE = (uint64_t)1 << 45; // (32TB)
};

struct StorageConstants {
// The default amount of memory pre-allocated to both the default and large pages buffer pool.
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE = 1ull << 30; // (1GB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 27; // (128MB)
// The default ratio of system memory allocated to buffer pools (including default and large).
static constexpr double DEFAULT_BUFFER_POOL_RATIO = 0.8;
// The default ratio of buffer allocated to default and large pages.
static constexpr double DEFAULT_PAGES_BUFFER_RATIO = 0.75;
static constexpr double LARGE_PAGES_BUFFER_RATIO = 1.0 - DEFAULT_PAGES_BUFFER_RATIO;
static constexpr char OVERFLOW_FILE_SUFFIX[] = ".ovf";
static constexpr char COLUMN_FILE_SUFFIX[] = ".col";
static constexpr char LISTS_FILE_SUFFIX[] = ".lists";
Expand All @@ -63,6 +63,7 @@ struct StorageConstants {
// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
static constexpr uint64_t PAGE_GROUP_SIZE = (uint64_t)1 << PAGE_GROUP_SIZE_LOG2;
static constexpr uint64_t PAGE_IDX_IN_GROUP_MASK = ((uint64_t)1 << PAGE_GROUP_SIZE_LOG2) - 1;
};

struct ListsMetadataConstants {
Expand Down
22 changes: 5 additions & 17 deletions src/include/common/in_mem_overflow_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace common {

struct BufferBlock {
public:
explicit BufferBlock(std::unique_ptr<storage::MemoryBlock> block)
: size{block->size}, currentOffset{0}, block{std::move(block)} {}
explicit BufferBlock(std::unique_ptr<storage::BufferHandle> block)
: size{block->allocator->getPageSize()}, currentOffset{0}, block{std::move(block)} {}

public:
uint64_t size;
uint64_t currentOffset;
std::unique_ptr<storage::MemoryBlock> block;
std::unique_ptr<storage::BufferHandle> block;

inline void resetCurrentOffset() { currentOffset = 0; }
};
Expand All @@ -27,15 +27,6 @@ class InMemOverflowBuffer {
explicit InMemOverflowBuffer(storage::MemoryManager* memoryManager)
: memoryManager{memoryManager}, currentBlock{nullptr} {};

// The blocks used are allocated through the MemoryManager but are backed by the
// BufferManager. We need to therefore release them back by calling
// memoryManager->freeBlock.
~InMemOverflowBuffer() {
for (auto& block : blocks) {
memoryManager->freeBlock(block->block->pageIdx);
}
}

uint8_t* allocateSpace(uint64_t size);

inline void merge(InMemOverflowBuffer& other) {
Expand All @@ -54,9 +45,6 @@ class InMemOverflowBuffer {
inline void resetBuffer() {
if (!blocks.empty()) {
auto firstBlock = std::move(blocks[0]);
for (auto i = 1u; i < blocks.size(); ++i) {
memoryManager->freeBlock(blocks[i]->block->pageIdx);
}
blocks.clear();
firstBlock->resetCurrentOffset();
blocks.push_back(std::move(firstBlock));
Expand All @@ -68,10 +56,10 @@ class InMemOverflowBuffer {

private:
inline bool requireNewBlock(uint64_t sizeToAllocate) {
if (sizeToAllocate > BufferPoolConstants::LARGE_PAGE_SIZE) {
if (sizeToAllocate > BufferPoolConstants::PAGE_256KB_SIZE) {
throw RuntimeException("Require size " + std::to_string(sizeToAllocate) +
" greater than single block size " +
std::to_string(BufferPoolConstants::LARGE_PAGE_SIZE) + ".");
std::to_string(BufferPoolConstants::PAGE_256KB_SIZE) + ".");
}
return currentBlock == nullptr ||
(currentBlock->currentOffset + sizeToAllocate) > currentBlock->size;
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ using hash_t = uint64_t;
using page_idx_t = uint32_t;
using page_offset_t = uint32_t;
constexpr page_idx_t PAGE_IDX_MAX = UINT32_MAX;
using page_group_idx_t = uint32_t;
using list_header_t = uint32_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
Expand Down
1 change: 1 addition & 0 deletions src/include/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <vector>

#include "common/constants.h"
#include "common/types/types.h"
#include "exception.h"
#include "spdlog/fmt/fmt.h"

Expand Down
6 changes: 1 addition & 5 deletions src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@ KUZU_API struct SystemConfig {
/**
* @brief Creates a SystemConfig object.
* @param bufferPoolSize Buffer pool size in bytes.
* @note Currently, we have two internal buffer pools with different frame size of 4KB and
* 256KB. When a user sets a customized buffer pool size, it is divided into two internal pools
* based on the DEFAULT_PAGES_BUFFER_RATIO and LARGE_PAGES_BUFFER_RATIO.
*/
explicit SystemConfig(uint64_t bufferPoolSize);

uint64_t defaultPageBufferPoolSize;
uint64_t largePageBufferPoolSize;
uint64_t bufferPoolSize;
uint64_t maxNumThreads;
};

Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,19 @@ class DataBlock {
public:
explicit DataBlock(storage::MemoryManager* memoryManager)
: numTuples{0}, memoryManager{memoryManager} {
block = memoryManager->allocateBlock(true);
freeSize = block->size;
block = memoryManager->allocateBlock(true /* initializeToZero */);
freeSize = block->allocator->getPageSize();
}

DataBlock(DataBlock&& other) = default;

~DataBlock() { memoryManager->freeBlock(block->pageIdx); }

inline uint8_t* getData() const { return block->data; }
inline uint8_t* getData() const { return block->buffer; }
inline void resetNumTuplesAndFreeSize() {
freeSize = common::BufferPoolConstants::LARGE_PAGE_SIZE;
freeSize = common::BufferPoolConstants::PAGE_256KB_SIZE;
numTuples = 0;
}
inline void resetToZero() {
memset(block->data, 0, common::BufferPoolConstants::LARGE_PAGE_SIZE);
memset(block->buffer, 0, common::BufferPoolConstants::PAGE_256KB_SIZE);
}

static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
Expand All @@ -60,7 +58,7 @@ class DataBlock {
storage::MemoryManager* memoryManager;

private:
std::unique_ptr<storage::MemoryBlock> block;
std::unique_ptr<storage::BufferHandle> block;
};

class DataBlockCollection {
Expand Down
34 changes: 34 additions & 0 deletions src/include/storage/buffer_manager/buffer_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <mutex>

#include "common/constants.h"
#include "common/types/types.h"

namespace kuzu {
namespace storage {

class BufferAllocator {
friend class BufferManager;

public:
explicit BufferAllocator(common::PageSizeClass pageSizeClass,
uint64_t maxSize = common::BufferPoolConstants::DEFAULT_MMAP_MAX_SIZE);
~BufferAllocator();

common::page_group_idx_t addNewPageGroup();

inline uint8_t* getFrame(common::page_idx_t pageIdx) {
return region + ((std::uint64_t)pageIdx << pageSizeClass);
}

private:
std::mutex mtx;
uint8_t* region;
common::PageSizeClass pageSizeClass;
uint64_t numPageGroups;
uint64_t maxNumPageGroups;
};

} // namespace storage
} // namespace kuzu
86 changes: 67 additions & 19 deletions src/include/storage/buffer_manager/buffer_managed_file_handle.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,52 @@
#pragma once

#include "storage/buffer_manager/buffer_allocator.h"
#include "storage/file_handle.h"

namespace kuzu {
namespace storage {

static constexpr uint64_t LOADED_MASK = 0x8000000000000000;
static constexpr uint64_t DIRTY_MASK = 0x4000000000000000;
static constexpr uint64_t PAGE_IDX_MASK = 0x3FFFFFFFFFFFFFFF;

enum class LockMode : uint8_t { SPIN = 0, NON_BLOCKING = 1 };

class BufferManagedFileHandle;

// Keeps state information for a disk page or an in memory data block.
class PageState {
public:
inline bool isLoaded() const { return pageIdx & LOADED_MASK; }
inline void setDirty() { pageIdx |= DIRTY_MASK; }
inline void clearDirty() { pageIdx &= ~DIRTY_MASK; }
inline bool isDirty() const { return pageIdx & DIRTY_MASK; }
inline BufferManagedFileHandle* getFileHandle() const { return fileHandle; }
inline common::page_idx_t getPageIdx() const {
return (common::page_idx_t)(pageIdx & PAGE_IDX_MASK);
}
inline uint64_t incrementPinCount() { return pinCount.fetch_add(1); }
inline uint64_t decrementPinCount() { return pinCount.fetch_sub(1); }
inline void setPinCount(uint64_t newPinCount) { pinCount.store(newPinCount); }
inline uint64_t getPinCount() const { return pinCount.load(); }
inline uint64_t getEvictionTimestamp() const { return evictionTimestamp.load(); }
inline uint64_t incrementEvictionTimestamp() { return evictionTimestamp.fetch_add(1); }
inline void releaseLock() { lock.clear(); }

bool acquireLock(LockMode lockMode);
void setLoaded(BufferManagedFileHandle* fileHandle_, common::page_idx_t pageIdx_);
void resetState();

private:
std::atomic_flag lock = ATOMIC_FLAG_INIT;
// Highest 1st bit indicates if this block is loaded or not, 2nd bit indicates if this
// block is dirty or not. The rest 62 bits records the page idx inside the file.
uint64_t pageIdx = 0;
BufferManagedFileHandle* fileHandle;
std::atomic<uint32_t> pinCount = 0;
std::atomic<uint64_t> evictionTimestamp = 0;
};

// BufferManagedFileHandle is a file handle that is backed by BufferManager. It holds the state of
// each in the file. File Handle is the bridge between a Column/Lists/Index and the Buffer Manager
// that abstracts the file in which that Column/Lists/Index is stored.
Expand All @@ -18,11 +60,8 @@ class BufferManagedFileHandle : public FileHandle {
NON_VERSIONED_FILE = 1 // The file does not have any versioned pages in wal file.
};

BufferManagedFileHandle(
const std::string& path, uint8_t flags, FileVersionedType fileVersionedType);

bool acquirePageLock(common::page_idx_t pageIdx, bool block);
inline void releasePageLock(common::page_idx_t pageIdx) { pageLocks[pageIdx]->clear(); }
BufferManagedFileHandle(const std::string& path, uint8_t flags,
FileVersionedType fileVersionedType, BufferAllocator* bufferRegion);

void createPageVersionGroupIfNecessary(common::page_idx_t pageIdx);

Expand All @@ -38,34 +77,43 @@ class BufferManagedFileHandle : public FileHandle {
void setWALPageVersion(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);
void setWALPageVersionNoLock(common::page_idx_t pageIdx, common::page_idx_t pageVersion);

inline common::page_idx_t getFrameIdx(common::page_idx_t pageIdx) {
return pageIdxToFrameMap[pageIdx]->load();
inline bool acquirePageLock(common::page_idx_t pageIdx, LockMode lockMode) {
return getPageState(pageIdx)->acquireLock(lockMode);
}
inline void swizzle(common::page_idx_t pageIdx, common::page_idx_t swizzledVal) {
pageIdxToFrameMap[pageIdx]->store(swizzledVal);
inline void releasePageLock(common::page_idx_t pageIdx) {
getPageState(pageIdx)->releaseLock();
}
inline void unswizzle(common::page_idx_t pageIdx) {
pageIdxToFrameMap[pageIdx]->store(UINT32_MAX);
inline PageState* getPageState(common::page_idx_t pageIdx) {
assert(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
}

static inline bool isAFrame(common::page_idx_t mappedFrameIdx) {
return UINT32_MAX != mappedFrameIdx;
inline void clearPageState(common::page_idx_t pageIdx) {
assert(pageIdx < numPages && pageStates[pageIdx]);
pageStates[pageIdx]->resetState();
}
inline uint8_t* getFrame(common::page_idx_t localPageIdx) {
assert(localPageIdx < pageCapacity);
auto localPageGroupIdx = localPageIdx >> common::StorageConstants::PAGE_GROUP_SIZE_LOG2;
common::page_idx_t globalPageIdx =
(pageGroupIdxes[localPageGroupIdx] << common::StorageConstants::PAGE_GROUP_SIZE_LOG2) +
(localPageIdx & common::StorageConstants::PAGE_IDX_IN_GROUP_MASK);
return bufferRegion->getFrame(globalPageIdx);
}

private:
void initPageIdxToFrameMapAndLocks();
void initPageStatesAndGroups();
common::page_idx_t addNewPageWithoutLock() override;
bool acquire(common::page_idx_t pageIdx);
void addNewPageGroupWithoutLock();
void removePageIdxAndTruncateIfNecessaryWithoutLock(common::page_idx_t pageIdxToRemove);
void resizePageGroupLocksAndPageVersionsWithoutLock();
uint32_t getNumPageGroups() {
return ceil((double)numPages / common::StorageConstants::PAGE_GROUP_SIZE);
}

private:
FileVersionedType fileVersionedType;
std::vector<std::unique_ptr<std::atomic_flag>> pageLocks;
std::vector<std::unique_ptr<std::atomic<common::page_idx_t>>> pageIdxToFrameMap;
BufferAllocator* bufferRegion;
std::vector<std::unique_ptr<PageState>> pageStates;
std::vector<common::page_group_idx_t> pageGroupIdxes;
std::vector<std::vector<common::page_idx_t>> pageVersions;
std::vector<std::unique_ptr<std::atomic_flag>> pageGroupLocks;
};
Expand Down
Loading

0 comments on commit 94d4c22

Please sign in to comment.