Skip to content

Commit

Permalink
Merge pull request #1895 from kuzudb/wal-da
Browse files Browse the repository at this point in the history
Move the initialization of metadada disk arrays to wal-based transaction mechanism
  • Loading branch information
ray6080 committed Aug 9, 2023
2 parents 765d6f3 + 7d5edb3 commit e8d1a08
Show file tree
Hide file tree
Showing 20 changed files with 79 additions and 97 deletions.
13 changes: 11 additions & 2 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,19 @@ void Catalog::renameTable(table_id_t tableID, const std::string& newName) {
catalogContentForWriteTrx->renameTable(tableID, newName);
}

void Catalog::addProperty(
void Catalog::addNodeProperty(table_id_t tableID, const std::string& propertyName,
std::unique_ptr<LogicalType> dataType, std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addNodeProperty(
propertyName, std::move(dataType), std::move(metadataDAHInfo));
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
}

void Catalog::addRelProperty(
table_id_t tableID, const std::string& propertyName, std::unique_ptr<LogicalType> dataType) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addProperty(
catalogContentForWriteTrx->getTableSchema(tableID)->addRelProperty(
propertyName, std::move(dataType));
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
Expand Down
5 changes: 4 additions & 1 deletion src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ class Catalog {

void renameTable(common::table_id_t tableID, const std::string& newName);

void addProperty(common::table_id_t tableID, const std::string& propertyName,
void addNodeProperty(common::table_id_t tableID, const std::string& propertyName,
std::unique_ptr<common::LogicalType> dataType,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo);
void addRelProperty(common::table_id_t tableID, const std::string& propertyName,
std::unique_ptr<common::LogicalType> dataType);

void dropProperty(common::table_id_t tableID, common::property_id_t propertyID);
Expand Down
8 changes: 7 additions & 1 deletion src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ class TableSchema {

std::vector<Property*> getProperties() const;

inline void addProperty(
inline void addNodeProperty(std::string propertyName,
std::unique_ptr<common::LogicalType> dataType,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
properties.push_back(std::make_unique<Property>(std::move(propertyName),
std::move(dataType), increaseNextPropertyID(), tableID, std::move(metadataDAHInfo)));
}
inline void addRelProperty(
std::string propertyName, std::unique_ptr<common::LogicalType> dataType) {
properties.push_back(std::make_unique<Property>(
std::move(propertyName), std::move(dataType), increaseNextPropertyID(), tableID));
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/add_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class AddProperty : public DDL {
defaultValueEvaluator->init(*resultSet, context->memoryManager);
}

void executeDDLInternal() override;
void executeDDLInternal() override = 0;

std::string getOutputMsg() override { return {"Add Succeed."}; }

Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ class StorageManager {
inline BMFileHandle* getDataFH() const { return dataFH.get(); }
inline BMFileHandle* getMetadataFH() const { return metadataFH.get(); }

std::unique_ptr<catalog::MetadataDAHInfo> initMetadataDAHInfo(
std::unique_ptr<catalog::MetadataDAHInfo> createMetadataDAHInfo(
const common::LogicalType& dataType);

private:
std::unique_ptr<BMFileHandle> dataFH;
std::unique_ptr<BMFileHandle> metadataFH;
catalog::Catalog& catalog;
MemoryManager& memoryManager;
WAL* wal;
std::unique_ptr<RelsStore> relsStore;
std::unique_ptr<NodesStore> nodesStore;
Expand Down
8 changes: 8 additions & 0 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ class InMemDiskArray : public BaseInMemDiskArray<T> {
InMemDiskArray(FileHandle& fileHandle, StorageStructureID storageStructureID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal);

static inline common::page_idx_t addDAHPageToFile(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, BufferManager* bufferManager, WAL* wal) {
DiskArrayHeader daHeader(sizeof(T));
return StorageStructureUtils::insertNewPage(fileHandle,
StorageStructureID{StorageStructureType::METADATA}, *bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
Expand Down
2 changes: 0 additions & 2 deletions src/include/storage/storage_structure/storage_structure.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class StorageStructure {
}

protected:
void addNewPageToFileHandle();

// If necessary creates a second version (backed by the WAL) of a page that contains the value
// that will be written to. The position of the value, which determines the original page to
// update, is computed from the given elementOffset and numElementsPerPage argument. Obtains
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class StorageStructureUtils {
static void readWALVersionOfPage(BMFileHandle& fileHandle, common::page_idx_t originalPageIdx,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& readOp);

static common::page_idx_t insertNewPage(
BMFileHandle& fileHandle, StorageStructureID storageStructureID,
BufferManager& bufferManager, WAL& wal,
std::function<void(uint8_t*)> insertOp = [](uint8_t*) -> void {
// DO NOTHING.
});

// Note: This function updates a page "transactionally", i.e., creates the WAL version of the
// page if it doesn't exist. For the original page to be updated, the current WRITE trx needs to
// commit and checkpoint.
Expand Down
3 changes: 0 additions & 3 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ class NodeColumn {
void writeValue(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom);

// TODO(Guodong): This is mostly duplicated with StorageStructure::addNewPageToFileHandle().
// Should be cleaned up later.
void addNewPageToDataFH();
// TODO(Guodong): This is mostly duplicated with
// StorageStructure::createWALVersionOfPageIfNecessaryForElement(). Should be cleared later.
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(common::offset_t nodeOffset);
Expand Down
24 changes: 0 additions & 24 deletions src/include/storage/wal_replayer_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ namespace storage {

class WALReplayerUtils {
public:
static inline void initPropertyMetadataDAsOnDisk(
catalog::Property& property, BMFileHandle* metadataFH) {
saveMetaDAs(metadataFH, *property.getMetadataDAHInfo());
}
static inline void initTableMetadataDAsOnDisk(
catalog::NodeTableSchema* tableSchema, BMFileHandle* metadataFH) {
for (auto& property : tableSchema->properties) {
initPropertyMetadataDAsOnDisk(*property, metadataFH);
}
}

static inline void removeHashIndexFile(
catalog::NodeTableSchema* tableSchema, const std::string& directory) {
fileOperationOnNodeFiles(
Expand Down Expand Up @@ -51,19 +40,6 @@ class WALReplayerUtils {
catalog::RelTableSchema* relTableSchema, common::property_id_t propertyID);

private:
static inline void saveMetaDAs(
BMFileHandle* metadataFH, const catalog::MetadataDAHInfo& metaDAHeaderInfo) {
std::make_unique<InMemDiskArrayBuilder<ColumnChunkMetadata>>(
*reinterpret_cast<FileHandle*>(metadataFH), metaDAHeaderInfo.dataDAHPageIdx, 0)
->saveToDisk();
std::make_unique<InMemDiskArrayBuilder<ColumnChunkMetadata>>(
*reinterpret_cast<FileHandle*>(metadataFH), metaDAHeaderInfo.nullDAHPageIdx, 0)
->saveToDisk();
for (auto& childMetaDAHeaderInfo : metaDAHeaderInfo.childrenInfos) {
saveMetaDAs(metadataFH, *childMetaDAHeaderInfo);
}
}

static inline void removeColumnFilesForPropertyIfExists(const std::string& directory,
common::table_id_t relTableID, common::table_id_t boundTableID,
common::RelDataDirection relDirection, common::property_id_t propertyID,
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/ddl/add_node_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace kuzu {
namespace processor {

void AddNodeProperty::executeDDLInternal() {
AddProperty::executeDDLInternal();
auto property = catalog->getWriteVersion()->getNodeProperty(tableID, propertyName);
property->setMetadataDAHInfo(storageManager.initMetadataDAHInfo(*dataType));
auto metadataDAHInfo = storageManager.createMetadataDAHInfo(*dataType);
catalog->addNodeProperty(
tableID, propertyName, std::move(dataType), std::move(metadataDAHInfo));
}

} // namespace processor
Expand Down
6 changes: 1 addition & 5 deletions src/processor/operator/ddl/add_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
namespace kuzu {
namespace processor {

void AddProperty::executeDDLInternal() {
defaultValueEvaluator->evaluate();
catalog->addProperty(tableID, propertyName, dataType->copy());
}

uint8_t* AddProperty::getDefaultVal() {
defaultValueEvaluator->evaluate();
auto expressionVector = defaultValueEvaluator->resultVector;
assert(defaultValueEvaluator->resultVector->dataType == *dataType);
auto posInExpressionVector = expressionVector->state->selVector->selectedPositions[0];
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/ddl/add_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace kuzu {
namespace processor {

void AddRelProperty::executeDDLInternal() {
AddProperty::executeDDLInternal();
catalog->addRelProperty(tableID, propertyName, dataType->copy());
auto tableSchema = catalog->getWriteVersion()->getRelTableSchema(tableID);
auto property = tableSchema->getProperty(tableSchema->getPropertyID(propertyName));
StorageUtils::createFileForRelPropertyWithDefaultVal(
Expand Down
11 changes: 5 additions & 6 deletions src/processor/operator/ddl/create_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ namespace kuzu {
namespace processor {

void CreateNodeTable::executeDDLInternal() {
auto newTableID = catalog->addNodeTableSchema(
tableName, primaryKeyIdx, catalog::Property::copyProperties(properties));
for (auto& property : properties) {
property->setMetadataDAHInfo(
storageManager.createMetadataDAHInfo(*property->getDataType()));
}
auto newTableID = catalog->addNodeTableSchema(tableName, primaryKeyIdx, std::move(properties));
nodesStatistics->addNodeStatisticsAndDeletedIDs(
catalog->getWriteVersion()->getNodeTableSchema(newTableID));
auto tableSchema = catalog->getWriteVersion()->getNodeTableSchema(newTableID);
for (auto& property : tableSchema->properties) {
property->setMetadataDAHInfo(storageManager.initMetadataDAHInfo(*property->getDataType()));
}
}

std::string CreateNodeTable::getOutputMsg() {
Expand Down
17 changes: 11 additions & 6 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace kuzu {
namespace storage {

StorageManager::StorageManager(Catalog& catalog, MemoryManager& memoryManager, WAL* wal)
: catalog{catalog}, wal{wal} {
: catalog{catalog}, memoryManager{memoryManager}, wal{wal} {
dataFH = memoryManager.getBufferManager()->getBMFileHandle(
StorageUtils::getDataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
Expand All @@ -25,21 +25,26 @@ StorageManager::StorageManager(Catalog& catalog, MemoryManager& memoryManager, W
nodesStore->getNodesStatisticsAndDeletedIDs().setAdjListsAndColumns(relsStore.get());
}

std::unique_ptr<MetadataDAHInfo> StorageManager::initMetadataDAHInfo(const LogicalType& dataType) {
std::unique_ptr<MetadataDAHInfo> StorageManager::createMetadataDAHInfo(
const common::LogicalType& dataType) {
auto metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
metadataDAHInfo->dataDAHPageIdx = metadataFH->addNewPage();
metadataDAHInfo->nullDAHPageIdx = metadataFH->addNewPage();
metadataDAHInfo->dataDAHPageIdx = InMemDiskArray<ColumnChunkMetadata>::addDAHPageToFile(
*metadataFH, StorageStructureID{StorageStructureType::METADATA},
memoryManager.getBufferManager(), wal);
metadataDAHInfo->nullDAHPageIdx = InMemDiskArray<ColumnChunkMetadata>::addDAHPageToFile(
*metadataFH, StorageStructureID{StorageStructureType::METADATA},
memoryManager.getBufferManager(), wal);
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
auto fields = StructType::getFields(&dataType);
metadataDAHInfo->childrenInfos.resize(fields.size());
for (auto i = 0u; i < fields.size(); i++) {
metadataDAHInfo->childrenInfos[i] = initMetadataDAHInfo(*fields[i]->getType());
metadataDAHInfo->childrenInfos[i] = createMetadataDAHInfo(*fields[i]->getType());
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
initMetadataDAHInfo(*VarListType::getChildType(&dataType)));
createMetadataDAHInfo(*VarListType::getChildType(&dataType)));
} break;
default: {
// DO NOTHING.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppen
// Note that if byteCursor.pos is already 0 the next operation keeps the nextBytePos
// where it is.
nextBytePosToWriteTo = (fileHandle->getNumPages() * BufferPoolConstants::PAGE_4KB_SIZE);
addNewPageToFileHandle();
StorageStructureUtils::insertNewPage(*fileHandle, storageStructureID, *bufferManager, *wal);
}
}

Expand Down
13 changes: 1 addition & 12 deletions src/storage/storage_structure/storage_structure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,14 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace storage {

void StorageStructure::addNewPageToFileHandle() {
auto pageIdxInOriginalFile = fileHandle->addNewPage();
auto pageIdxInWAL = wal->logPageInsertRecord(storageStructureID, pageIdxInOriginalFile);
bufferManager->pin(
*wal->fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
fileHandle->addWALPageIdxGroupIfNecessary(pageIdxInOriginalFile);
fileHandle->setWALPageIdx(pageIdxInOriginalFile, pageIdxInWAL);
wal->fileHandle->setLockedPageDirty(pageIdxInWAL);
bufferManager->unpin(*wal->fileHandle, pageIdxInWAL);
}

WALPageIdxPosInPageAndFrame StorageStructure::createWALVersionOfPageIfNecessaryForElement(
uint64_t elementOffset, uint64_t numElementsPerPage) {
auto originalPageCursor =
PageUtils::getPageElementCursorForPos(elementOffset, numElementsPerPage);
bool insertingNewPage = false;
if (originalPageCursor.pageIdx >= fileHandle->getNumPages()) {
assert(originalPageCursor.pageIdx == fileHandle->getNumPages());
addNewPageToFileHandle();
StorageStructureUtils::insertNewPage(*fileHandle, storageStructureID, *bufferManager, *wal);
insertingNewPage = true;
}
auto walPageIdxAndFrame =
Expand Down
15 changes: 15 additions & 0 deletions src/storage/storage_structure/storage_structure_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ std::pair<BMFileHandle*, page_idx_t> StorageStructureUtils::getFileHandleAndPhys
}
}

common::page_idx_t StorageStructureUtils::insertNewPage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, BufferManager& bufferManager, WAL& wal,
std::function<void(uint8_t*)> insertOp) {
auto newOriginalPage = fileHandle.addNewPage();
auto newWALPage = wal.logPageInsertRecord(storageStructureID, newOriginalPage);
auto walFrame = bufferManager.pin(
*wal.fileHandle, newWALPage, BufferManager::PageReadPolicy::DONT_READ_PAGE);
fileHandle.addWALPageIdxGroupIfNecessary(newOriginalPage);
fileHandle.setWALPageIdx(newOriginalPage, newWALPage);
insertOp(walFrame);
wal.fileHandle->setLockedPageDirty(newWALPage);
bufferManager.unpin(*wal.fileHandle, newWALPage);
return newOriginalPage;
}

void StorageStructureUtils::updatePage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, page_idx_t originalPageIdx, bool isInsertingNewPage,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& updateOp) {
Expand Down
13 changes: 1 addition & 12 deletions src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,25 +285,14 @@ void NodeColumn::writeValue(
dataFH->releaseWALPageIdxLock(walPageInfo.originalPageIdx);
}

void NodeColumn::addNewPageToDataFH() {
auto pageIdxInOriginalFile = dataFH->addNewPage();
auto pageIdxInWAL = wal->logPageInsertRecord(storageStructureID, pageIdxInOriginalFile);
bufferManager->pin(
*wal->fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
dataFH->addWALPageIdxGroupIfNecessary(pageIdxInOriginalFile);
dataFH->setWALPageIdx(pageIdxInOriginalFile, pageIdxInWAL);
wal->fileHandle->setLockedPageDirty(pageIdxInWAL);
bufferManager->unpin(*wal->fileHandle, pageIdxInWAL);
}

WALPageIdxPosInPageAndFrame NodeColumn::createWALVersionOfPageForValue(offset_t nodeOffset) {
auto nodeGroupIdx = getNodeGroupIdxFromNodeOffset(nodeOffset);
auto originalPageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage);
originalPageCursor.pageIdx += metadataDA->get(nodeGroupIdx, TransactionType::WRITE).pageIdx;
bool insertingNewPage = false;
if (originalPageCursor.pageIdx >= dataFH->getNumPages()) {
assert(originalPageCursor.pageIdx == dataFH->getNumPages());
addNewPageToDataFH();
StorageStructureUtils::insertNewPage(*dataFH, storageStructureID, *bufferManager, *wal);
insertingNewPage = true;
}
auto walPageIdxAndFrame =
Expand Down
16 changes: 0 additions & 16 deletions src/storage/wal_replayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ void WALReplayer::replayNodeTableRecord(const kuzu::storage::WALRecord& walRecor
auto catalogForCheckpointing = getCatalogForRecovery(DBFileType::WAL_VERSION);
auto nodeTableSchema = catalogForCheckpointing->getReadOnlyVersion()->getNodeTableSchema(
walRecord.nodeTableRecord.tableID);
auto metadataFH =
bufferManager->getBMFileHandle(StorageUtils::getMetadataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE);
WALReplayerUtils::initTableMetadataDAsOnDisk(
catalogForCheckpointing->getReadOnlyVersion()->getNodeTableSchema(
walRecord.nodeTableRecord.tableID),
metadataFH.get());
WALReplayerUtils::createEmptyHashIndexFiles(nodeTableSchema, wal->getDirectory());
if (!isRecovering) {
// If we are not recovering, i.e., we are checkpointing during normal execution,
Expand Down Expand Up @@ -450,8 +442,6 @@ void WALReplayer::replayAddPropertyRecord(const kuzu::storage::WALRecord& walRec
auto property = tableSchema->getProperty(propertyID);
switch (tableSchema->getTableType()) {
case catalog::TableType::NODE: {
WALReplayerUtils::initPropertyMetadataDAsOnDisk(
*property, storageManager->getMetadataFH());
storageManager->getNodesStore().getNodeTable(tableID)->addProperty(*property);
} break;
case catalog::TableType::REL: {
Expand All @@ -474,12 +464,6 @@ void WALReplayer::replayAddPropertyRecord(const kuzu::storage::WALRecord& walRec
catalogForRecovery->getReadOnlyVersion()->getNodeTableSchema(tableID);
switch (tableSchema->getTableType()) {
case catalog::TableType::NODE: {
auto property = tableSchema->getProperty(propertyID);
auto metadataFH = bufferManager->getBMFileHandle(
StorageUtils::getMetadataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE);
WALReplayerUtils::initPropertyMetadataDAsOnDisk(*property, metadataFH.get());
// DO NOTHING.
} break;
case catalog::TableType::REL: {
Expand Down

0 comments on commit e8d1a08

Please sign in to comment.