Skip to content

Commit

Permalink
fix bm file handle remove pages in eviction queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 3, 2023
1 parent c85bb1c commit 7ee24f2
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class Database {
private:
std::string databasePath;
SystemConfig systemConfig;
std::unique_ptr<storage::BufferManager> bufferManager;
std::unique_ptr<storage::MemoryManager> memoryManager;
std::unique_ptr<processor::QueryProcessor> queryProcessor;
std::unique_ptr<storage::BufferManager> bufferManager;
std::unique_ptr<catalog::Catalog> catalog;
std::unique_ptr<storage::StorageManager> storageManager;
std::unique_ptr<transaction::TransactionManager> transactionManager;
Expand Down
14 changes: 7 additions & 7 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_store.h"

namespace kuzu {
Expand All @@ -9,16 +10,15 @@ namespace processor {
class CopyRel : public Copy {
public:
CopyRel(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics,
storage::RelsStatistics* relsStatistics, uint32_t id, const std::string& paramsString)
common::table_id_t tableID, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
storage::NodesStore& nodesStore, uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_REL, catalog, std::move(copyDescription), tableID, wal,
id, paramsString},
nodesStatistics{nodesStatistics}, relsStatistics{relsStatistics} {}
relsStatistics{relsStatistics}, nodesStore{nodesStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRel>(catalog, copyDescription, tableID, wal, nodesStatistics,
relsStatistics, id, paramsString);
return make_unique<CopyRel>(
catalog, copyDescription, tableID, wal, relsStatistics, nodesStore, id, paramsString);
}

protected:
Expand All @@ -31,8 +31,8 @@ class CopyRel : public Copy {
}

private:
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStatistics* relsStatistics;
storage::NodesStore& nodesStore;
};

} // namespace processor
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class BMFileHandle : public FileHandle {
BMFileHandle(const std::string& path, uint8_t flags, BufferManager* bm,
common::PageSizeClass pageSizeClass, FileVersionedType fileVersionedType);

~BMFileHandle();

// This function assumes the page is already LOCKED.
inline void setLockedPageDirty(common::page_idx_t pageIdx) {
assert(pageIdx < numPages);
Expand Down
22 changes: 18 additions & 4 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ struct EvictionCandidate {

class EvictionQueue {
public:
EvictionQueue() { queue = std::make_unique<moodycamel::ConcurrentQueue<EvictionCandidate>>(); }
explicit EvictionQueue(uint64_t capacity) : capacity{capacity} {
queue = std::make_unique<moodycamel::ConcurrentQueue<EvictionCandidate>>(capacity);
}

inline void enqueue(EvictionCandidate& candidate) { queue->enqueue(candidate); }
inline void enqueue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
queue->enqueue(candidate);
}
inline void enqueue(BMFileHandle* fileHandle, common::page_idx_t pageIdx, PageState* pageState,
uint64_t pageVersion) {
std::shared_lock sLck{mtx};
queue->enqueue(EvictionCandidate{fileHandle, pageIdx, pageState, pageVersion});
}
inline bool dequeue(EvictionCandidate& candidate) { return queue->try_dequeue(candidate); }
inline bool dequeue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
return queue->try_dequeue(candidate);
}

void removeNonEvictableCandidates();

void removeCandidatesForFile(BMFileHandle& fileHandle);

private:
std::shared_mutex mtx;
uint64_t capacity;
std::unique_ptr<moodycamel::ConcurrentQueue<EvictionCandidate>> queue;
};

Expand Down Expand Up @@ -152,7 +166,7 @@ class BufferManager {
enum class PageReadPolicy : uint8_t { READ_PAGE = 0, DONT_READ_PAGE = 1 };

explicit BufferManager(uint64_t bufferPoolSize);
~BufferManager();
~BufferManager() = default;

uint8_t* pin(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy = PageReadPolicy::READ_PAGE);
Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "storage/index/hash_index.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_statistics.h"
#include "table_copier.h"

Expand All @@ -16,8 +17,8 @@ class RelCopier : public TableCopier {
public:
RelCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerNodeTable,
BufferManager* bufferManager, common::table_id_t tableID, RelsStatistics* relsStatistics);
storage::NodesStore& nodesStore, BufferManager* bufferManager, common::table_id_t tableID,
RelsStatistics* relsStatistics);

private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);
Expand Down Expand Up @@ -67,7 +68,7 @@ class RelCopier : public TableCopier {
template<typename T>
static void inferTableIDsAndOffsets(const std::vector<std::shared_ptr<T>>& batchColumns,
std::vector<common::nodeID_t>& nodeIDs, std::vector<common::DataType>& nodeIDTypes,
const std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>>& pkIndexes,
const std::map<common::table_id_t, PrimaryKeyIndex*>& pkIndexes,
transaction::Transaction* transaction, int64_t blockOffset, int64_t& colIndex);

template<typename T>
Expand Down Expand Up @@ -140,9 +141,10 @@ class RelCopier : public TableCopier {
const std::shared_ptr<spdlog::logger>& logger);

private:
storage::NodesStore& nodesStore;
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>> pkIndexes;
std::map<common::table_id_t, PrimaryKeyIndex*> pkIndexes;
std::atomic<uint64_t> numRels = 0;
std::vector<std::unique_ptr<atomic_uint64_vec_t>> listSizesPerDirection{2};
std::vector<std::unique_ptr<InMemAdjColumn>> adjColumnsPerDirection{2};
Expand Down
4 changes: 2 additions & 2 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyToPhysical(
getOperatorID(), copy->getExpressionsForPrinting());
} else {
return std::make_unique<CopyRel>(catalog, copy->getCopyDescription(), copy->getTableID(),
storageManager.getWAL(), nodesStatistics, relsStatistics, getOperatorID(),
copy->getExpressionsForPrinting());
storageManager.getWAL(), relsStatistics, storageManager.getNodesStore(),
getOperatorID(), copy->getExpressionsForPrinting());
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace processor {
uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCSVCopier = make_unique<RelCopier>(copyDescription, wal->getDirectory(), *taskScheduler,
*catalog, nodesStatistics->getMaxNodeOffsetPerTable(), executionContext->bufferManager,
tableID, relsStatistics);
*catalog, nodesStore, executionContext->bufferManager, tableID, relsStatistics);
auto numRelsCopied = relCSVCopier->copy();
wal->logCopyRelRecord(tableID);
return numRelsCopied;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/buffer_manager/bm_file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager
initPageStatesAndGroups();
}

BMFileHandle::~BMFileHandle() {
bm->removeFilePagesFromFrames(*this);
}

void BMFileHandle::initPageStatesAndGroups() {
pageStates.resize(pageCapacity);
for (auto i = 0ull; i < numPages; i++) {
Expand Down
25 changes: 20 additions & 5 deletions src/storage/buffer_manager/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ using namespace kuzu::common;

namespace kuzu {
namespace storage {

// In this function, we try to remove as many as possible candidates that are not evictable from the
// eviction queue until we hit a candidate that is evictable.
// 1) If the candidate page's version has changed, which means the page was pinned and unpinned, we
Expand All @@ -18,8 +17,9 @@ namespace storage {
// as MARKED, and moving the candidate to the back of the queue.
// 3) If the candidate page's state is LOCKED, we remove the candidate from the queue.
void EvictionQueue::removeNonEvictableCandidates() {
EvictionCandidate evictionCandidate;
std::shared_lock sLck{mtx};
while (true) {
EvictionCandidate evictionCandidate;
if (!queue->try_dequeue(evictionCandidate)) {
break;
}
Expand All @@ -43,19 +43,33 @@ void EvictionQueue::removeNonEvictableCandidates() {
}
}

void EvictionQueue::removeCandidatesForFile(kuzu::storage::BMFileHandle& fileHandle) {
std::unique_lock xLck{mtx};
EvictionCandidate candidate;
uint64_t loopedCandidateIdx = 0;
auto numCandidatesInQueue = queue->size_approx();
while (loopedCandidateIdx < numCandidatesInQueue && queue->try_dequeue(candidate)) {
if (candidate.fileHandle != &fileHandle) {
queue->enqueue(candidate);
}
loopedCandidateIdx++;
}
}

BufferManager::BufferManager(uint64_t bufferPoolSize)
: logger{LoggerUtils::getLogger(common::LoggerConstants::LoggerEnum::BUFFER_MANAGER)},
usedMemory{0}, bufferPoolSize{bufferPoolSize}, numEvictionQueueInsertions{0} {
logger->info("Done initializing buffer manager.");
if (bufferPoolSize < BufferPoolConstants::PAGE_4KB_SIZE) {
throw BufferManagerException("The given buffer pool size should be at least 4KB.");
}
vmRegions.resize(2);
vmRegions[0] = std::make_unique<VMRegion>(
PageSizeClass::PAGE_4KB, BufferPoolConstants::DEFAULT_VM_REGION_MAX_SIZE);
vmRegions[1] = std::make_unique<VMRegion>(PageSizeClass::PAGE_256KB, bufferPoolSize);
evictionQueue = std::make_unique<EvictionQueue>();
evictionQueue = std::make_unique<EvictionQueue>(bufferPoolSize / BufferPoolConstants::PAGE_4KB_SIZE);
}

BufferManager::~BufferManager() = default;

// Important Note: Pin returns a raw pointer to the frame. This is potentially very dangerous and
// trusts the caller is going to protect this memory space.
// Important responsibilities for the caller are:
Expand Down Expand Up @@ -225,6 +239,7 @@ void BufferManager::flushIfDirtyWithoutLock(BMFileHandle& fileHandle, common::pa
}

void BufferManager::removeFilePagesFromFrames(BMFileHandle& fileHandle) {
evictionQueue->removeCandidatesForFile(fileHandle);
for (auto pageIdx = 0u; pageIdx < fileHandle.getNumPages(); ++pageIdx) {
removePageFromFrame(fileHandle, pageIdx, false /* do not flush */);
}
Expand Down
19 changes: 8 additions & 11 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ namespace kuzu {
namespace storage {

RelCopier::RelCopier(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, Catalog& catalog,
std::map<table_id_t, offset_t> maxNodeOffsetsPerNodeTable, BufferManager* bufferManager,
table_id_t tableID, RelsStatistics* relsStatistics)
TaskScheduler& taskScheduler, Catalog& catalog, storage::NodesStore& nodesStore,
BufferManager* bufferManager, table_id_t tableID, RelsStatistics* relsStatistics)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
relsStatistics},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)} {
nodesStore{nodesStore},
maxNodeOffsetsPerTable{
nodesStore.getNodesStatisticsAndDeletedIDs().getMaxNodeOffsetPerTable()} {
dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx();
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
initializePkIndexes(relTableSchema->srcTableID, *bufferManager);
Expand Down Expand Up @@ -171,11 +172,7 @@ void RelCopier::initListsMetadata() {
}

void RelCopier::initializePkIndexes(table_id_t nodeTableID, BufferManager& bufferManager) {
pkIndexes.emplace(nodeTableID,
std::make_unique<PrimaryKeyIndex>(
StorageUtils::getNodeIndexIDAndFName(this->outputDirectory, nodeTableID),
catalog.getReadOnlyVersion()->getNodeTableSchema(nodeTableID)->getPrimaryKey().dataType,
bufferManager, nullptr /* wal */));
pkIndexes.emplace(nodeTableID, nodesStore.getPKIndex(nodeTableID));
}

arrow::Status RelCopier::executePopulateTask(PopulateTaskType populateTaskType) {
Expand Down Expand Up @@ -342,8 +339,8 @@ void RelCopier::sortAndCopyOverflowValues() {
template<typename T>
void RelCopier::inferTableIDsAndOffsets(const std::vector<std::shared_ptr<T>>& batchColumns,
std::vector<nodeID_t>& nodeIDs, std::vector<DataType>& nodeIDTypes,
const std::map<table_id_t, std::unique_ptr<PrimaryKeyIndex>>& pkIndexes,
Transaction* transaction, int64_t blockOffset, int64_t& colIndex) {
const std::map<table_id_t, PrimaryKeyIndex*>& pkIndexes, Transaction* transaction,
int64_t blockOffset, int64_t& colIndex) {
for (auto& relDirection : REL_DIRECTIONS) {
if (colIndex >= batchColumns.size()) {
throw CopyException("Number of columns mismatch.");
Expand Down
1 change: 0 additions & 1 deletion tools/python_api/test/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def test_to_arrow_complex(establish_connection):
conn, db = establish_connection

def _test_node(_conn):
# query = "MATCH (a:person) RETURN a.ID, a.fName, a.gender, a.isStudent, a.isWorker, a.age, a.eyeSight, a.birthdate, a.registerTime, a.lastJobDuration, a.workedHours, a.usedNames, a.courseScoresPerTerm ORDER BY a.ID;"
query = "MATCH (p:person) RETURN p ORDER BY p.ID"
query_result = _conn.execute(query)
arrow_tbl = query_result.get_as_arrow(12)
Expand Down

0 comments on commit 7ee24f2

Please sign in to comment.