Skip to content

Commit

Permalink
Change metadata da to wal-based transaction mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Aug 7, 2023
1 parent 5370cdf commit 663dcf5
Show file tree
Hide file tree
Showing 24 changed files with 188 additions and 178 deletions.
13 changes: 11 additions & 2 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ void Catalog::renameTable(table_id_t tableID, const std::string& newName) {
catalogContentForWriteTrx->renameTable(tableID, newName);
}

void Catalog::addProperty(
void Catalog::addNodeProperty(table_id_t tableID, const std::string& propertyName,
std::unique_ptr<LogicalType> dataType, std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addNodeProperty(
propertyName, std::move(dataType), std::move(metadataDAHInfo));
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
}

void Catalog::addRelProperty(
table_id_t tableID, const std::string& propertyName, std::unique_ptr<LogicalType> dataType) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addProperty(
catalogContentForWriteTrx->getTableSchema(tableID)->addRelProperty(
propertyName, std::move(dataType));
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
Expand Down
5 changes: 4 additions & 1 deletion src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ class Catalog {

void renameTable(common::table_id_t tableID, const std::string& newName);

void addProperty(common::table_id_t tableID, const std::string& propertyName,
void addNodeProperty(common::table_id_t tableID, const std::string& propertyName,
std::unique_ptr<common::LogicalType> dataType,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo);
void addRelProperty(common::table_id_t tableID, const std::string& propertyName,
std::unique_ptr<common::LogicalType> dataType);

void dropProperty(common::table_id_t tableID, common::property_id_t propertyID);
Expand Down
8 changes: 7 additions & 1 deletion src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ class TableSchema {

std::vector<Property*> getProperties() const;

inline void addProperty(
inline void addNodeProperty(std::string propertyName,
std::unique_ptr<common::LogicalType> dataType,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
properties.push_back(std::make_unique<Property>(std::move(propertyName),
std::move(dataType), increaseNextPropertyID(), tableID, std::move(metadataDAHInfo)));
}
inline void addRelProperty(
std::string propertyName, std::unique_ptr<common::LogicalType> dataType) {
properties.push_back(std::make_unique<Property>(
std::move(propertyName), std::move(dataType), increaseNextPropertyID(), tableID));
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/add_node_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class AddNodeProperty : public AddProperty {

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<AddNodeProperty>(catalog, tableID, propertyName, dataType->copy(),
expressionEvaluator->clone(), storageManager, outputPos, id, paramsString);
defaultValueEvaluator->clone(), storageManager, outputPos, id, paramsString);
}
};

Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/ddl/add_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ class AddProperty : public DDL {
public:
AddProperty(catalog::Catalog* catalog, common::table_id_t tableID, std::string propertyName,
std::unique_ptr<common::LogicalType> dataType,
std::unique_ptr<evaluator::BaseExpressionEvaluator> expressionEvaluator,
std::unique_ptr<evaluator::BaseExpressionEvaluator> defaultValueEvaluator,
storage::StorageManager& storageManager, const DataPos& outputPos, uint32_t id,
const std::string& paramsString)
: DDL{PhysicalOperatorType::ADD_PROPERTY, catalog, outputPos, id, paramsString},
tableID{tableID}, propertyName{std::move(propertyName)}, dataType{std::move(dataType)},
expressionEvaluator{std::move(expressionEvaluator)}, storageManager{storageManager} {}
defaultValueEvaluator{std::move(defaultValueEvaluator)}, storageManager{storageManager} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
DDL::initLocalStateInternal(resultSet, context);
expressionEvaluator->init(*resultSet, context->memoryManager);
defaultValueEvaluator->init(*resultSet, context->memoryManager);
}

void executeDDLInternal() override;
void executeDDLInternal() override = 0;

std::string getOutputMsg() override { return {"Add Succeed."}; }

protected:
inline bool isDefaultValueNull() const {
auto expressionVector = expressionEvaluator->resultVector;
auto expressionVector = defaultValueEvaluator->resultVector;
return expressionVector->isNull(expressionVector->state->selVector->selectedPositions[0]);
}

Expand All @@ -39,7 +39,7 @@ class AddProperty : public DDL {
common::table_id_t tableID;
std::string propertyName;
std::unique_ptr<common::LogicalType> dataType;
std::unique_ptr<evaluator::BaseExpressionEvaluator> expressionEvaluator;
std::unique_ptr<evaluator::BaseExpressionEvaluator> defaultValueEvaluator;
storage::StorageManager& storageManager;
};

Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/add_rel_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class AddRelProperty : public AddProperty {

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<AddRelProperty>(catalog, tableID, propertyName, dataType->copy(),
expressionEvaluator->clone(), storageManager, outputPos, id, paramsString);
defaultValueEvaluator->clone(), storageManager, outputPos, id, paramsString);
}
};

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class StorageManager {
std::unique_ptr<BMFileHandle> dataFH;
std::unique_ptr<BMFileHandle> metadataFH;
catalog::Catalog& catalog;
MemoryManager& memoryManager;
WAL* wal;
std::unique_ptr<RelsStore> relsStore;
std::unique_ptr<NodesStore> nodesStore;
Expand Down
8 changes: 8 additions & 0 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ class InMemDiskArray : public BaseInMemDiskArray<T> {
InMemDiskArray(FileHandle& fileHandle, StorageStructureID storageStructureID,
common::page_idx_t headerPageIdx, BufferManager* bufferManager, WAL* wal);

static inline common::page_idx_t initializeHeaderPage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, BufferManager* bufferManager, WAL* wal) {
DiskArrayHeader daHeader(sizeof(T));
return StorageStructureUtils::insertNewPage(fileHandle,
StorageStructureID{StorageStructureType::METADATA}, *bufferManager, *wal,
[&](uint8_t* frame) -> void { memcpy(frame, &daHeader, sizeof(DiskArrayHeader)); });
}

inline void checkpointInMemoryIfNecessary() override {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class StorageStructureUtils {
static void readWALVersionOfPage(BMFileHandle& fileHandle, common::page_idx_t originalPageIdx,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& readOp);

static common::page_idx_t insertNewPage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, BufferManager& bufferManager, WAL& wal,
const std::function<void(uint8_t*)>& insertOp);

// Note: This function updates a page "transactionally", i.e., creates the WAL version of the
// page if it doesn't exist. For the original page to be updated, the current WRITE trx needs to
// commit and checkpoint.
Expand Down
24 changes: 0 additions & 24 deletions src/include/storage/wal_replayer_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,6 @@ namespace storage {

class WALReplayerUtils {
public:
static inline void initPropertyMetadataDAsOnDisk(
catalog::Property& property, BMFileHandle* metadataFH) {
saveMetaDAs(metadataFH, *property.getMetadataDAHInfo());
}
static inline void initTableMetadataDAsOnDisk(
catalog::NodeTableSchema* tableSchema, BMFileHandle* metadataFH) {
for (auto& property : tableSchema->properties) {
initPropertyMetadataDAsOnDisk(*property, metadataFH);
}
}

static inline void removeHashIndexFile(
catalog::NodeTableSchema* tableSchema, const std::string& directory) {
fileOperationOnNodeFiles(
Expand Down Expand Up @@ -51,19 +40,6 @@ class WALReplayerUtils {
catalog::RelTableSchema* relTableSchema, common::property_id_t propertyID);

private:
static inline void saveMetaDAs(
BMFileHandle* metadataFH, const catalog::MetadataDAHInfo& metaDAHeaderInfo) {
std::make_unique<InMemDiskArrayBuilder<ColumnChunkMetadata>>(
*reinterpret_cast<FileHandle*>(metadataFH), metaDAHeaderInfo.dataDAHPageIdx, 0)
->saveToDisk();
std::make_unique<InMemDiskArrayBuilder<ColumnChunkMetadata>>(
*reinterpret_cast<FileHandle*>(metadataFH), metaDAHeaderInfo.nullDAHPageIdx, 0)
->saveToDisk();
for (auto& childMetaDAHeaderInfo : metaDAHeaderInfo.childrenInfos) {
saveMetaDAs(metadataFH, *childMetaDAHeaderInfo);
}
}

static inline void removeColumnFilesForPropertyIfExists(const std::string& directory,
common::table_id_t relTableID, common::table_id_t boundTableID,
common::RelDataDirection relDirection, common::property_id_t propertyID,
Expand Down
7 changes: 3 additions & 4 deletions src/main/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,9 @@ void Connection::beginTransactionIfAutoCommit(PreparedStatement* preparedStateme
}
if (!preparedStatement->allowActiveTransaction() && activeTransaction) {
throw ConnectionException(
"DDL, CopyCSV, createMacro statements are automatically wrapped in a "
"transaction and committed. As such, they cannot be part of an "
"active transaction, please commit or rollback your previous transaction and "
"issue a ddl query without opening a transaction.");
"DDL, Copy, createMacro statements can only run in the AUTO_COMMIT mode. Please commit "
"or rollback your previous transaction if there is any and issue the query without "
"beginning a transaction");
}
if (ConnectionTransactionMode::AUTO_COMMIT == transactionMode) {
assert(!activeTransaction);
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/ddl/add_node_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace kuzu {
namespace processor {

void AddNodeProperty::executeDDLInternal() {
AddProperty::executeDDLInternal();
auto property = catalog->getWriteVersion()->getNodeProperty(tableID, propertyName);
property->setMetadataDAHInfo(storageManager.initMetadataDAHInfo(*dataType));
auto metadataDAHInfo = storageManager.initMetadataDAHInfo(*dataType);
catalog->addNodeProperty(
tableID, propertyName, std::move(dataType), std::move(metadataDAHInfo));
}

} // namespace processor
Expand Down
9 changes: 2 additions & 7 deletions src/processor/operator/ddl/add_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@
namespace kuzu {
namespace processor {

void AddProperty::executeDDLInternal() {
expressionEvaluator->evaluate();
catalog->addProperty(tableID, propertyName, dataType->copy());
}

uint8_t* AddProperty::getDefaultVal() {
auto expressionVector = expressionEvaluator->resultVector;
assert(expressionEvaluator->resultVector->dataType == *dataType);
auto expressionVector = defaultValueEvaluator->resultVector;
assert(defaultValueEvaluator->resultVector->dataType == *dataType);
auto posInExpressionVector = expressionVector->state->selVector->selectedPositions[0];
return expressionVector->getData() +
expressionVector->getNumBytesPerValue() * posInExpressionVector;
Expand Down
6 changes: 4 additions & 2 deletions src/processor/operator/ddl/add_rel_property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ namespace kuzu {
namespace processor {

void AddRelProperty::executeDDLInternal() {
AddProperty::executeDDLInternal();
defaultValueEvaluator->evaluate();
auto defaultVal = getDefaultVal();
catalog->addRelProperty(tableID, propertyName, std::move(dataType));
auto tableSchema = catalog->getWriteVersion()->getRelTableSchema(tableID);
auto property = tableSchema->getProperty(tableSchema->getPropertyID(propertyName));
StorageUtils::createFileForRelPropertyWithDefaultVal(
tableSchema, *property, getDefaultVal(), isDefaultValueNull(), storageManager);
tableSchema, *property, defaultVal, isDefaultValueNull(), storageManager);
}

} // namespace processor
Expand Down
11 changes: 5 additions & 6 deletions src/processor/operator/ddl/create_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
#include "common/string_utils.h"
#include "storage/storage_manager.h"

using namespace kuzu::catalog;
using namespace kuzu::common;

namespace kuzu {
namespace processor {

void CreateNodeTable::executeDDLInternal() {
auto newTableID = catalog->addNodeTableSchema(
tableName, primaryKeyIdx, catalog::Property::copyProperties(properties));
nodesStatistics->addNodeStatisticsAndDeletedIDs(
catalog->getWriteVersion()->getNodeTableSchema(newTableID));
auto tableSchema = catalog->getWriteVersion()->getNodeTableSchema(newTableID);
for (auto& property : tableSchema->properties) {
for (auto& property : properties) {
property->setMetadataDAHInfo(storageManager.initMetadataDAHInfo(*property->getDataType()));
}
auto newTableID = catalog->addNodeTableSchema(tableName, primaryKeyIdx, std::move(properties));
nodesStatistics->addNodeStatisticsAndDeletedIDs(
catalog->getWriteVersion()->getNodeTableSchema(newTableID));
}

std::string CreateNodeTable::getOutputMsg() {
Expand Down
10 changes: 7 additions & 3 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace kuzu {
namespace storage {

StorageManager::StorageManager(Catalog& catalog, MemoryManager& memoryManager, WAL* wal)
: catalog{catalog}, wal{wal} {
: catalog{catalog}, memoryManager{memoryManager}, wal{wal} {
dataFH = memoryManager.getBufferManager()->getBMFileHandle(
StorageUtils::getDataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
Expand All @@ -27,8 +27,12 @@ StorageManager::StorageManager(Catalog& catalog, MemoryManager& memoryManager, W

std::unique_ptr<MetadataDAHInfo> StorageManager::initMetadataDAHInfo(const LogicalType& dataType) {
auto metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
metadataDAHInfo->dataDAHPageIdx = metadataFH->addNewPage();
metadataDAHInfo->nullDAHPageIdx = metadataFH->addNewPage();
metadataDAHInfo->dataDAHPageIdx = InMemDiskArray<ColumnChunkMetadata>::initializeHeaderPage(
*metadataFH, StorageStructureID{StorageStructureType::METADATA},
memoryManager.getBufferManager(), wal);
metadataDAHInfo->nullDAHPageIdx = InMemDiskArray<ColumnChunkMetadata>::initializeHeaderPage(
*metadataFH, StorageStructureID{StorageStructureType::METADATA},
memoryManager.getBufferManager(), wal);
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
auto fields = StructType::getFields(&dataType);
Expand Down
15 changes: 15 additions & 0 deletions src/storage/storage_structure/storage_structure_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ std::pair<BMFileHandle*, page_idx_t> StorageStructureUtils::getFileHandleAndPhys
}
}

common::page_idx_t StorageStructureUtils::insertNewPage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, BufferManager& bufferManager, WAL& wal,
const std::function<void(uint8_t*)>& insertOp) {
auto newOriginalPage = fileHandle.addNewPage();
auto newWALPage = wal.logPageInsertRecord(storageStructureID, newOriginalPage);
auto walFrame = bufferManager.pin(
*wal.fileHandle, newWALPage, BufferManager::PageReadPolicy::DONT_READ_PAGE);
fileHandle.addWALPageIdxGroupIfNecessary(newOriginalPage);
fileHandle.setWALPageIdx(newOriginalPage, newWALPage);
insertOp(walFrame);
wal.fileHandle->setLockedPageDirty(newWALPage);
bufferManager.unpin(*wal.fileHandle, newWALPage);
return newOriginalPage;
}

void StorageStructureUtils::updatePage(BMFileHandle& fileHandle,
StorageStructureID storageStructureID, page_idx_t originalPageIdx, bool isInsertingNewPage,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& updateOp) {
Expand Down
16 changes: 0 additions & 16 deletions src/storage/wal_replayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ void WALReplayer::replayNodeTableRecord(const kuzu::storage::WALRecord& walRecor
auto catalogForCheckpointing = getCatalogForRecovery(DBFileType::WAL_VERSION);
auto nodeTableSchema = catalogForCheckpointing->getReadOnlyVersion()->getNodeTableSchema(
walRecord.nodeTableRecord.tableID);
auto metadataFH =
bufferManager->getBMFileHandle(StorageUtils::getMetadataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE);
WALReplayerUtils::initTableMetadataDAsOnDisk(
catalogForCheckpointing->getReadOnlyVersion()->getNodeTableSchema(
walRecord.nodeTableRecord.tableID),
metadataFH.get());
WALReplayerUtils::createEmptyHashIndexFiles(nodeTableSchema, wal->getDirectory());
if (!isRecovering) {
// If we are not recovering, i.e., we are checkpointing during normal execution,
Expand Down Expand Up @@ -450,8 +442,6 @@ void WALReplayer::replayAddPropertyRecord(const kuzu::storage::WALRecord& walRec
auto property = tableSchema->getProperty(propertyID);
switch (tableSchema->getTableType()) {
case catalog::TableType::NODE: {
WALReplayerUtils::initPropertyMetadataDAsOnDisk(
*property, storageManager->getMetadataFH());
storageManager->getNodesStore().getNodeTable(tableID)->addProperty(*property);
} break;
case catalog::TableType::REL: {
Expand All @@ -474,12 +464,6 @@ void WALReplayer::replayAddPropertyRecord(const kuzu::storage::WALRecord& walRec
catalogForRecovery->getReadOnlyVersion()->getNodeTableSchema(tableID);
switch (tableSchema->getTableType()) {
case catalog::TableType::NODE: {
auto property = tableSchema->getProperty(propertyID);
auto metadataFH = bufferManager->getBMFileHandle(
StorageUtils::getMetadataFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE);
WALReplayerUtils::initPropertyMetadataDAsOnDisk(*property, metadataFH.get());
// DO NOTHING.
} break;
case catalog::TableType::REL: {
Expand Down
22 changes: 0 additions & 22 deletions test/runner/e2e_copy_transaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,27 +212,5 @@ TEST_F(TinySnbCopyCSVTransactionTest, CopyRelCommitRecovery) {
copyRelCSVCommitAndRecoveryTest(TransactionTestType::RECOVERY);
}

TEST_F(TinySnbCopyCSVTransactionTest, CopyNodeOutputMsg) {
conn->query(createPersonTableCMD);
conn->query(createKnowsTableCMD);
auto result = conn->query(copyPersonTableCMD);
ASSERT_EQ(TestHelper::convertResultToString(*result),
std::vector<std::string>{"8 number of tuples has been copied to table: person."});
result = conn->query(copyKnowsTableCMD);
ASSERT_EQ(TestHelper::convertResultToString(*result),
std::vector<std::string>{"14 number of tuples has been copied to table: knows."});
}

TEST_F(TinySnbCopyCSVTransactionTest, CopyCSVStatementWithActiveTransactionErrorTest) {
auto re = conn->query(createPersonTableCMD);
ASSERT_TRUE(re->isSuccess());
conn->beginWriteTransaction();
auto result = conn->query(copyPersonTableCMD);
ASSERT_EQ(result->getErrorMessage(),
"DDL, CopyCSV, createMacro statements are automatically wrapped in a transaction and "
"committed. As such, they cannot be part of an active transaction, please commit or "
"rollback your previous transaction and issue a ddl query without opening a transaction.");
}

} // namespace testing
} // namespace kuzu
8 changes: 0 additions & 8 deletions test/runner/e2e_create_macro_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,5 @@ TEST_F(CreateMacroTestTrxTest, createMacroReadTrxError) {
"Can't execute a write query inside a read-only transaction.");
}

TEST_F(CreateMacroTestTrxTest, createMacroWithActiveTrxError) {
conn->beginWriteTransaction();
ASSERT_EQ(conn->query("CREATE MACRO var_macro(x) AS x")->getErrorMessage(),
"DDL, CopyCSV, createMacro statements are automatically wrapped in a transaction and "
"committed. As such, they cannot be part of an active transaction, please commit or "
"rollback your previous transaction and issue a ddl query without opening a transaction.");
}

} // namespace testing
} // namespace kuzu
Loading

0 comments on commit 663dcf5

Please sign in to comment.