Skip to content

Commit

Permalink
Cache files when replaying WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Mar 26, 2024
1 parent e16b1e0 commit 5eb42af
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
14 changes: 14 additions & 0 deletions src/include/storage/wal/wal_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "common/enums/table_type.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "function/hash/hash_functions.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -234,3 +235,16 @@ struct WALRecord {

} // namespace storage
} // namespace kuzu

namespace std {
template<>
struct hash<kuzu::storage::DBFileID> {
size_t operator()(const kuzu::storage::DBFileID& fileId) const {
auto dbFileTypeHash = std::hash<uint8_t>()(static_cast<uint8_t>(fileId.dbFileType));
auto isOverflowHash = std::hash<bool>()(fileId.isOverflow);
auto nodeIndexIDHash = std::hash<kuzu::common::table_id_t>()(fileId.nodeIndexID.tableID);
return kuzu::function::combineHashScalar(
dbFileTypeHash, kuzu::function::combineHashScalar(isOverflowHash, nodeIndexIDHash));
}
};
} // namespace std
6 changes: 4 additions & 2 deletions src/include/storage/wal_replayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ class WALReplayer {

private:
void init();
void replayWALRecord(WALRecord& walRecord);
void replayPageUpdateOrInsertRecord(const WALRecord& walRecord);
void replayWALRecord(WALRecord& walRecord,
std::unordered_map<DBFileID, std::unique_ptr<common::FileInfo>>& fileCache);
void replayPageUpdateOrInsertRecord(const WALRecord& walRecord,
std::unordered_map<DBFileID, std::unique_ptr<common::FileInfo>>& fileCache);
void replayTableStatisticsRecord(const WALRecord& walRecord);
void replayCatalogRecord();
void replayCreateTableRecord(const WALRecord& walRecord);
Expand Down
25 changes: 19 additions & 6 deletions src/storage/wal_replayer.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#include "storage/wal_replayer.h"

#include <unordered_map>

#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "common/exception/storage.h"
#include "common/file_system/file_info.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/store/node_table.h"
#include "storage/wal/wal_record.h"
#include "storage/wal_replayer_utils.h"
#include "transaction/transaction.h"

Expand Down Expand Up @@ -41,9 +46,10 @@ void WALReplayer::replay() {
if (!wal->isEmptyWAL()) {
auto walIterator = wal->getIterator();
WALRecord walRecord;
std::unordered_map<DBFileID, std::unique_ptr<FileInfo>> fileCache;
while (walIterator->hasNextRecord()) {
walIterator->getNextRecord(walRecord);
replayWALRecord(walRecord);
replayWALRecord(walRecord, fileCache);
}
}
// We next perform an in-memory checkpointing or rolling back of node/relTables.
Expand All @@ -56,10 +62,11 @@ void WALReplayer::replay() {
}
}

void WALReplayer::replayWALRecord(WALRecord& walRecord) {
void WALReplayer::replayWALRecord(
WALRecord& walRecord, std::unordered_map<DBFileID, std::unique_ptr<FileInfo>>& fileCache) {
switch (walRecord.recordType) {
case WALRecordType::PAGE_UPDATE_OR_INSERT_RECORD: {
replayPageUpdateOrInsertRecord(walRecord);
replayPageUpdateOrInsertRecord(walRecord, fileCache);
} break;
case WALRecordType::TABLE_STATISTICS_RECORD: {
replayTableStatisticsRecord(walRecord);
Expand Down Expand Up @@ -94,12 +101,18 @@ void WALReplayer::replayWALRecord(WALRecord& walRecord) {
}
}

void WALReplayer::replayPageUpdateOrInsertRecord(const WALRecord& walRecord) {
void WALReplayer::replayPageUpdateOrInsertRecord(const WALRecord& walRecord,
std::unordered_map<DBFileID, std::unique_ptr<FileInfo>>& fileCache) {
// 1. As the first step we copy over the page on disk, regardless of if we are recovering
// (and checkpointing) or checkpointing while during regular execution.
auto dbFileID = walRecord.pageInsertOrUpdateRecord.dbFileID;
std::unique_ptr<FileInfo> fileInfoOfDBFile =
StorageUtils::getFileInfoForReadWrite(wal->getDirectory(), dbFileID, vfs);
auto entry = fileCache.find(dbFileID);
if (entry == fileCache.end()) {
fileCache.insert(std::make_pair(
dbFileID, StorageUtils::getFileInfoForReadWrite(wal->getDirectory(), dbFileID, vfs)));
entry = fileCache.find(dbFileID);
}
auto& fileInfoOfDBFile = entry->second;
if (isCheckpoint) {
if (!wal->isLastLoggedRecordCommit()) {
// Nothing to undo.
Expand Down

0 comments on commit 5eb42af

Please sign in to comment.