Skip to content

Commit

Permalink
Use a clock-based eviction strategy
Browse files Browse the repository at this point in the history
Instead of queueing potential pages to evict when unpinning, all pages currently in memory are kept in the evictionqueue.
When more space is needed, pages are evicted if they haven't changed since the last time we scanned them.

Based on the hash table implementation in the VMCache paper, except that
we store page states by reference and for simplicity store the eviction
candidates in a circular buffer instead of a hash table since lookups
aren't necessary.
  • Loading branch information
benjaminwinger committed Jun 11, 2024
1 parent ba0a602 commit 8e455e4
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 127 deletions.
5 changes: 4 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ add_subdirectory(extension)
add_library(kuzu STATIC ${ALL_OBJECT_FILES})
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})

set(KUZU_LIBRARIES antlr4_cypher antlr4_runtime brotlidec brotlicommon fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet zstd miniz mbedtls)
set(KUZU_LIBRARIES antlr4_cypher antlr4_runtime brotlidec brotlicommon fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet zstd miniz mbedtls )
if(NOT WIN32)
set(KUZU_LIBRARIES dl ${KUZU_LIBRARIES})
endif()
if(NOT MSVC AND NOT APPLE)
set(KUZU_LIBRARIES atomic ${KUZU_LIBRARIES})
endif()
if (ENABLE_BACKTRACES)
set(KUZU_LIBRARIES ${KUZU_LIBRARIES} cpptrace::cpptrace)
endif()
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct BufferPoolConstants {
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB)
#endif

static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 27; // (128MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
};

struct StorageConstants {
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "common/concurrent_vector.h"
#include "common/constants.h"
#include "common/copy_constructors.h"
#include "common/types/types.h"
#include "storage/buffer_manager/vm_region.h"
#include "storage/file_handle.h"
Expand Down Expand Up @@ -148,6 +149,8 @@ class BMFileHandle : public FileHandle {
BMFileHandle(const std::string& path, uint8_t flags, BufferManager* bm,
common::PageSizeClass pageSizeClass, FileVersionedType fileVersionedType,
common::VirtualFileSystem* vfs, main::ClientContext* context);
// File handles are registered with the buffer manager and must not be moved or copied
DELETE_COPY_AND_MOVE(BMFileHandle);

~BMFileHandle() override;

Expand Down Expand Up @@ -180,6 +183,8 @@ class BMFileHandle : public FileHandle {
// This function assumes that the caller has already acquired the wal page idx lock.
void setWALPageIdxNoLock(common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL);

uint32_t getFileIndex() const { return fileIndex; }

private:
inline PageState* getPageState(common::page_idx_t pageIdx) {
KU_ASSERT(pageIdx < numPages);
Expand Down Expand Up @@ -217,6 +222,7 @@ class BMFileHandle : public FileHandle {
// `WALPageIdxGroup` records the WAL page idx for each page in the page group.
// Accesses to this map is synchronized by `fhSharedMutex`.
std::unordered_map<common::page_group_idx_t, std::unique_ptr<WALPageIdxGroup>> walPageIdxGroups;
uint32_t fileIndex;
};
} // namespace storage
} // namespace kuzu
95 changes: 54 additions & 41 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>

#include "common/types/types.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/locked_queue.h"
#include "storage/buffer_manager/memory_manager.h"

namespace kuzu {
Expand All @@ -15,55 +18,61 @@ namespace storage {
// candidate was recently accessed, it is no longer immediately evictable. See the state transition
// diagram above `BufferManager` class declaration for more details.
struct EvictionCandidate {
// If the candidate is Marked and its version is the same as the one kept inside the candidate,
// it is evictable.
friend class EvictionQueue;

public:
// If the candidate is Marked it is evictable.
inline bool isEvictable(uint64_t currPageStateAndVersion) const {
return PageState::getState(currPageStateAndVersion) == PageState::MARKED &&
PageState::getVersion(currPageStateAndVersion) == pageVersion;
return PageState::getState(currPageStateAndVersion) == PageState::MARKED;
}
// If the candidate was recently read optimistically, it is second chance evictable.
inline bool isSecondChanceEvictable(uint64_t currPageStateAndVersion) const {
return PageState::getState(currPageStateAndVersion) == PageState::UNLOCKED &&
PageState::getVersion(currPageStateAndVersion) == pageVersion;
return PageState::getState(currPageStateAndVersion) == PageState::UNLOCKED;
}

BMFileHandle* fileHandle = nullptr;
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
PageState* pageState = nullptr;
// The version of the corresponding page at the time the candidate is enqueued.
uint64_t pageVersion = -1u;

inline bool operator==(const EvictionCandidate& other) const {
return fileHandle == other.fileHandle && pageIdx == other.pageIdx &&
pageState == other.pageState && pageVersion == other.pageVersion;
return fileIdx == other.fileIdx && pageIdx == other.pageIdx && pageState == other.pageState;
}

// Returns false if the candidate was not empty, or if another thread set the value first
bool set(const EvictionCandidate& newCandidate);

uint32_t fileIdx = UINT32_MAX;
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
PageState* pageState = nullptr;
};

// A circular buffer queue storing eviction candidates
// One candidate should be stored for each page currently in memory
class EvictionQueue {
static constexpr EvictionCandidate EMPTY =
EvictionCandidate{UINT32_MAX, common::INVALID_PAGE_IDX, nullptr};

public:
EvictionQueue() { queue = std::make_unique<LockedQueue<EvictionCandidate>>(); }
explicit EvictionQueue(uint64_t capacity)
: insertCursor{0}, evictionCursor{0}, size{0}, capacity{capacity},
data{std::make_unique<std::atomic<EvictionCandidate>[]>(capacity)} {}

inline void enqueue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
queue->enqueue(candidate);
}
inline void enqueue(BMFileHandle* fileHandle, common::page_idx_t pageIdx, PageState* pageState,
uint64_t pageVersion) {
std::shared_lock sLck{mtx};
queue->enqueue(EvictionCandidate{fileHandle, pageIdx, pageState, pageVersion});
}
inline bool dequeue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
return queue->try_dequeue(candidate);
}
bool insert(uint32_t fileIndex, common::page_idx_t pageIndex, PageState* pageStatel);

void removeNonEvictableCandidates();
// Produces the next non-empty candidate to be tried for eviction
// Note that it is still possible (though unlikely) for another thread
// to evict this candidate, so it is not guaranteed to be empty
// The PageState should be locked, and then the atomic checked against the version used
// when locking the pagestate to make sure there wasn't a data race
std::atomic<EvictionCandidate>* next();
void removeCandidatesForFile(uint32_t fileIndex);
void clear(std::atomic<EvictionCandidate>& candidate);

void removeCandidatesForFile(BMFileHandle& fileHandle);
uint64_t getSize() const { return size; }
uint64_t getCapacity() const { return capacity; }

private:
std::shared_mutex mtx;
std::unique_ptr<LockedQueue<EvictionCandidate>> queue;
std::atomic<uint64_t> insertCursor;
std::atomic<uint64_t> evictionCursor;
std::atomic<uint64_t> size;
const uint64_t capacity;
std::unique_ptr<std::atomic<EvictionCandidate>[]> data;
};

/**
Expand Down Expand Up @@ -194,10 +203,15 @@ class BufferManager {
inline common::frame_group_idx_t addNewFrameGroup(common::PageSizeClass pageSizeClass) {
return vmRegions[pageSizeClass]->addNewFrameGroup();
}
inline void clearEvictionQueue() { evictionQueue = std::make_unique<EvictionQueue>(); }

inline uint64_t getUsedMemory() const { return usedMemory; }

// Not thread-safe
uint32_t addFileHandle(BMFileHandle& fileHandle) {
fileHandles.push_back(&fileHandle);
return fileHandles.size() - 1;
}

private:
static void verifySizeParams(uint64_t bufferPoolSize, uint64_t maxDBSize);

Expand All @@ -207,17 +221,14 @@ class BufferManager {
bool claimAFrame(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy);
// Return number of bytes freed.
uint64_t tryEvictPage(EvictionCandidate& candidate);
uint64_t tryEvictPage(std::atomic<EvictionCandidate>& candidate);

void cachePageIntoFrame(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy);
void flushIfDirtyWithoutLock(BMFileHandle& fileHandle, common::page_idx_t pageIdx);
void removePageFromFrame(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
bool shouldFlush);

void addToEvictionQueue(BMFileHandle* fileHandle, common::page_idx_t pageIdx,
PageState* pageState);

inline uint64_t reserveUsedMemory(uint64_t size) { return usedMemory.fetch_add(size); }
inline uint64_t freeUsedMemory(uint64_t size) {
KU_ASSERT(usedMemory.load() >= size);
Expand All @@ -231,14 +242,16 @@ class BufferManager {
vmRegions[fileHandle.getPageSizeClass()]->releaseFrame(fileHandle.getFrameIdx(pageIdx));
}

uint64_t evictPages();

private:
std::atomic<uint64_t> usedMemory;
std::atomic<uint64_t> bufferPoolSize;
std::atomic<uint64_t> numEvictionQueueInsertions;
EvictionQueue evictionQueue;
std::atomic<uint64_t> usedMemory;
// Each VMRegion corresponds to a virtual memory region of a specific page size. Currently, we
// hold two sizes of PAGE_4KB and PAGE_256KB.
std::vector<std::unique_ptr<VMRegion>> vmRegions;
std::unique_ptr<EvictionQueue> evictionQueue;
std::vector<BMFileHandle*> fileHandles;
};

} // namespace storage
Expand Down
4 changes: 1 addition & 3 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ Database::Database(std::string_view databasePath, SystemConfig systemConfig)
databaseManager = std::make_unique<DatabaseManager>();
}

Database::~Database() {
bufferManager->clearEvictionQueue();
}
Database::~Database() {}

void Database::addTableFunction(std::string name, function::function_set functionSet) {
catalog->addBuiltInFunction(CatalogEntryType::TABLE_FUNCTION_ENTRY, std::move(name),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer_manager/bm_file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager
common::VirtualFileSystem* vfs, main::ClientContext* context)
: FileHandle{path, flags, vfs, context}, fileVersionedType{fileVersionedType}, bm{bm},
pageSizeClass{pageSizeClass}, pageStates{numPages, pageCapacity},
frameGroupIdxes{getNumPageGroups(), getNumPageGroups()} {
frameGroupIdxes{getNumPageGroups(), getNumPageGroups()}, fileIndex{bm->addFileHandle(*this)} {
for (auto i = 0u; i < frameGroupIdxes.size(); i++) {
frameGroupIdxes[i] = bm->addNewFrameGroup(pageSizeClass);
}
Expand Down
Loading

0 comments on commit 8e455e4

Please sign in to comment.