Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bm file handle remove pages in eviction queue #1431

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class Database {
private:
std::string databasePath;
SystemConfig systemConfig;
std::unique_ptr<storage::BufferManager> bufferManager;
std::unique_ptr<storage::MemoryManager> memoryManager;
std::unique_ptr<processor::QueryProcessor> queryProcessor;
std::unique_ptr<storage::BufferManager> bufferManager;
std::unique_ptr<catalog::Catalog> catalog;
std::unique_ptr<storage::StorageManager> storageManager;
std::unique_ptr<transaction::TransactionManager> transactionManager;
Expand Down
14 changes: 7 additions & 7 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_store.h"

namespace kuzu {
Expand All @@ -9,16 +10,15 @@ namespace processor {
class CopyRel : public Copy {
public:
CopyRel(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics,
storage::RelsStatistics* relsStatistics, uint32_t id, const std::string& paramsString)
common::table_id_t tableID, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
storage::NodesStore& nodesStore, uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_REL, catalog, std::move(copyDescription), tableID, wal,
id, paramsString},
nodesStatistics{nodesStatistics}, relsStatistics{relsStatistics} {}
relsStatistics{relsStatistics}, nodesStore{nodesStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRel>(catalog, copyDescription, tableID, wal, nodesStatistics,
relsStatistics, id, paramsString);
return make_unique<CopyRel>(
catalog, copyDescription, tableID, wal, relsStatistics, nodesStore, id, paramsString);
}

protected:
Expand All @@ -31,8 +31,8 @@ class CopyRel : public Copy {
}

private:
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStatistics* relsStatistics;
storage::NodesStore& nodesStore;
};

} // namespace processor
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class BMFileHandle : public FileHandle {
BMFileHandle(const std::string& path, uint8_t flags, BufferManager* bm,
common::PageSizeClass pageSizeClass, FileVersionedType fileVersionedType);

~BMFileHandle();

// This function assumes the page is already LOCKED.
inline void setLockedPageDirty(common::page_idx_t pageIdx) {
assert(pageIdx < numPages);
Expand Down
22 changes: 18 additions & 4 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ struct EvictionCandidate {

class EvictionQueue {
public:
EvictionQueue() { queue = std::make_unique<moodycamel::ConcurrentQueue<EvictionCandidate>>(); }
explicit EvictionQueue(uint64_t capacity) : capacity{capacity} {
queue = std::make_unique<moodycamel::ConcurrentQueue<EvictionCandidate>>(capacity);
}

inline void enqueue(EvictionCandidate& candidate) { queue->enqueue(candidate); }
inline void enqueue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
queue->enqueue(candidate);
}
inline void enqueue(BMFileHandle* fileHandle, common::page_idx_t pageIdx, PageState* pageState,
uint64_t pageVersion) {
std::shared_lock sLck{mtx};
queue->enqueue(EvictionCandidate{fileHandle, pageIdx, pageState, pageVersion});
}
inline bool dequeue(EvictionCandidate& candidate) { return queue->try_dequeue(candidate); }
inline bool dequeue(EvictionCandidate& candidate) {
std::shared_lock sLck{mtx};
return queue->try_dequeue(candidate);
}

void removeNonEvictableCandidates();

void removeCandidatesForFile(BMFileHandle& fileHandle);

private:
std::shared_mutex mtx;
uint64_t capacity;
std::unique_ptr<moodycamel::ConcurrentQueue<EvictionCandidate>> queue;
};

Expand Down Expand Up @@ -152,7 +166,7 @@ class BufferManager {
enum class PageReadPolicy : uint8_t { READ_PAGE = 0, DONT_READ_PAGE = 1 };

explicit BufferManager(uint64_t bufferPoolSize);
~BufferManager();
~BufferManager() = default;

uint8_t* pin(BMFileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy = PageReadPolicy::READ_PAGE);
Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "storage/index/hash_index.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_statistics.h"
#include "table_copier.h"

Expand All @@ -16,8 +17,8 @@ class RelCopier : public TableCopier {
public:
RelCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerNodeTable,
BufferManager* bufferManager, common::table_id_t tableID, RelsStatistics* relsStatistics);
storage::NodesStore& nodesStore, BufferManager* bufferManager, common::table_id_t tableID,
RelsStatistics* relsStatistics);

private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);
Expand Down Expand Up @@ -67,7 +68,7 @@ class RelCopier : public TableCopier {
template<typename T>
static void inferTableIDsAndOffsets(const std::vector<std::shared_ptr<T>>& batchColumns,
std::vector<common::nodeID_t>& nodeIDs, std::vector<common::DataType>& nodeIDTypes,
const std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>>& pkIndexes,
const std::map<common::table_id_t, PrimaryKeyIndex*>& pkIndexes,
transaction::Transaction* transaction, int64_t blockOffset, int64_t& colIndex);

template<typename T>
Expand Down Expand Up @@ -140,9 +141,10 @@ class RelCopier : public TableCopier {
const std::shared_ptr<spdlog::logger>& logger);

private:
storage::NodesStore& nodesStore;
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, std::unique_ptr<PrimaryKeyIndex>> pkIndexes;
std::map<common::table_id_t, PrimaryKeyIndex*> pkIndexes;
std::atomic<uint64_t> numRels = 0;
std::vector<std::unique_ptr<atomic_uint64_vec_t>> listSizesPerDirection{2};
std::vector<std::unique_ptr<InMemAdjColumn>> adjColumnsPerDirection{2};
Expand Down
4 changes: 2 additions & 2 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyToPhysical(
getOperatorID(), copy->getExpressionsForPrinting());
} else {
return std::make_unique<CopyRel>(catalog, copy->getCopyDescription(), copy->getTableID(),
storageManager.getWAL(), nodesStatistics, relsStatistics, getOperatorID(),
copy->getExpressionsForPrinting());
storageManager.getWAL(), relsStatistics, storageManager.getNodesStore(),
getOperatorID(), copy->getExpressionsForPrinting());
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace processor {
uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto relCSVCopier = make_unique<RelCopier>(copyDescription, wal->getDirectory(), *taskScheduler,
*catalog, nodesStatistics->getMaxNodeOffsetPerTable(), executionContext->bufferManager,
tableID, relsStatistics);
*catalog, nodesStore, executionContext->bufferManager, tableID, relsStatistics);
auto numRelsCopied = relCSVCopier->copy();
wal->logCopyRelRecord(tableID);
return numRelsCopied;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/buffer_manager/bm_file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ BMFileHandle::BMFileHandle(const std::string& path, uint8_t flags, BufferManager
initPageStatesAndGroups();
}

BMFileHandle::~BMFileHandle() {
bm->removeFilePagesFromFrames(*this);
}

void BMFileHandle::initPageStatesAndGroups() {
pageStates.resize(pageCapacity);
for (auto i = 0ull; i < numPages; i++) {
Expand Down
26 changes: 21 additions & 5 deletions src/storage/buffer_manager/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ using namespace kuzu::common;

namespace kuzu {
namespace storage {

// In this function, we try to remove as many as possible candidates that are not evictable from the
// eviction queue until we hit a candidate that is evictable.
// 1) If the candidate page's version has changed, which means the page was pinned and unpinned, we
Expand All @@ -18,8 +17,9 @@ namespace storage {
// as MARKED, and moving the candidate to the back of the queue.
// 3) If the candidate page's state is LOCKED, we remove the candidate from the queue.
void EvictionQueue::removeNonEvictableCandidates() {
EvictionCandidate evictionCandidate;
std::shared_lock sLck{mtx};
while (true) {
EvictionCandidate evictionCandidate;
if (!queue->try_dequeue(evictionCandidate)) {
break;
}
Expand All @@ -43,19 +43,34 @@ void EvictionQueue::removeNonEvictableCandidates() {
}
}

void EvictionQueue::removeCandidatesForFile(kuzu::storage::BMFileHandle& fileHandle) {
std::unique_lock xLck{mtx};
EvictionCandidate candidate;
uint64_t loopedCandidateIdx = 0;
auto numCandidatesInQueue = queue->size_approx();
while (loopedCandidateIdx < numCandidatesInQueue && queue->try_dequeue(candidate)) {
if (candidate.fileHandle != &fileHandle) {
queue->enqueue(candidate);
}
loopedCandidateIdx++;
}
}

BufferManager::BufferManager(uint64_t bufferPoolSize)
: logger{LoggerUtils::getLogger(common::LoggerConstants::LoggerEnum::BUFFER_MANAGER)},
usedMemory{0}, bufferPoolSize{bufferPoolSize}, numEvictionQueueInsertions{0} {
logger->info("Done initializing buffer manager.");
if (bufferPoolSize < BufferPoolConstants::PAGE_4KB_SIZE) {
throw BufferManagerException("The given buffer pool size should be at least 4KB.");
}
vmRegions.resize(2);
vmRegions[0] = std::make_unique<VMRegion>(
PageSizeClass::PAGE_4KB, BufferPoolConstants::DEFAULT_VM_REGION_MAX_SIZE);
vmRegions[1] = std::make_unique<VMRegion>(PageSizeClass::PAGE_256KB, bufferPoolSize);
evictionQueue = std::make_unique<EvictionQueue>();
evictionQueue =
std::make_unique<EvictionQueue>(bufferPoolSize / BufferPoolConstants::PAGE_4KB_SIZE);
}

BufferManager::~BufferManager() = default;

// Important Note: Pin returns a raw pointer to the frame. This is potentially very dangerous and
// trusts the caller is going to protect this memory space.
// Important responsibilities for the caller are:
Expand Down Expand Up @@ -225,6 +240,7 @@ void BufferManager::flushIfDirtyWithoutLock(BMFileHandle& fileHandle, common::pa
}

void BufferManager::removeFilePagesFromFrames(BMFileHandle& fileHandle) {
evictionQueue->removeCandidatesForFile(fileHandle);
for (auto pageIdx = 0u; pageIdx < fileHandle.getNumPages(); ++pageIdx) {
removePageFromFrame(fileHandle, pageIdx, false /* do not flush */);
}
Expand Down
19 changes: 8 additions & 11 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ namespace kuzu {
namespace storage {

RelCopier::RelCopier(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, Catalog& catalog,
std::map<table_id_t, offset_t> maxNodeOffsetsPerNodeTable, BufferManager* bufferManager,
table_id_t tableID, RelsStatistics* relsStatistics)
TaskScheduler& taskScheduler, Catalog& catalog, storage::NodesStore& nodesStore,
BufferManager* bufferManager, table_id_t tableID, RelsStatistics* relsStatistics)
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID,
relsStatistics},
maxNodeOffsetsPerTable{std::move(maxNodeOffsetsPerNodeTable)} {
nodesStore{nodesStore},
maxNodeOffsetsPerTable{
nodesStore.getNodesStatisticsAndDeletedIDs().getMaxNodeOffsetPerTable()} {
dummyReadOnlyTrx = Transaction::getDummyReadOnlyTrx();
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
initializePkIndexes(relTableSchema->srcTableID, *bufferManager);
Expand Down Expand Up @@ -171,11 +172,7 @@ void RelCopier::initListsMetadata() {
}

void RelCopier::initializePkIndexes(table_id_t nodeTableID, BufferManager& bufferManager) {
pkIndexes.emplace(nodeTableID,
std::make_unique<PrimaryKeyIndex>(
StorageUtils::getNodeIndexIDAndFName(this->outputDirectory, nodeTableID),
catalog.getReadOnlyVersion()->getNodeTableSchema(nodeTableID)->getPrimaryKey().dataType,
bufferManager, nullptr /* wal */));
pkIndexes.emplace(nodeTableID, nodesStore.getPKIndex(nodeTableID));
}

arrow::Status RelCopier::executePopulateTask(PopulateTaskType populateTaskType) {
Expand Down Expand Up @@ -342,8 +339,8 @@ void RelCopier::sortAndCopyOverflowValues() {
template<typename T>
void RelCopier::inferTableIDsAndOffsets(const std::vector<std::shared_ptr<T>>& batchColumns,
std::vector<nodeID_t>& nodeIDs, std::vector<DataType>& nodeIDTypes,
const std::map<table_id_t, std::unique_ptr<PrimaryKeyIndex>>& pkIndexes,
Transaction* transaction, int64_t blockOffset, int64_t& colIndex) {
const std::map<table_id_t, PrimaryKeyIndex*>& pkIndexes, Transaction* transaction,
int64_t blockOffset, int64_t& colIndex) {
for (auto& relDirection : REL_DIRECTIONS) {
if (colIndex >= batchColumns.size()) {
throw CopyException("Number of columns mismatch.");
Expand Down
1 change: 0 additions & 1 deletion tools/python_api/test/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def test_to_arrow_complex(establish_connection):
conn, db = establish_connection

def _test_node(_conn):
# query = "MATCH (a:person) RETURN a.ID, a.fName, a.gender, a.isStudent, a.isWorker, a.age, a.eyeSight, a.birthdate, a.registerTime, a.lastJobDuration, a.workedHours, a.usedNames, a.courseScoresPerTerm ORDER BY a.ID;"
query = "MATCH (p:person) RETURN p ORDER BY p.ID"
query_result = _conn.execute(query)
arrow_tbl = query_result.get_as_arrow(12)
Expand Down