Skip to content

Commit

Permalink
Remove file system from catalog and statistics (#3258)
Browse files Browse the repository at this point in the history
* Remove file system from catalog and statistics

* fix extension
  • Loading branch information
andyfengHKU committed Apr 12, 2024
1 parent f479fab commit 65a080f
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 138 deletions.
2 changes: 1 addition & 1 deletion extension/duckdb_scanner/src/include/duckdb_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct BoundExtraCreateDuckDBTableInfo : public binder::BoundExtraCreateTableInf

class DuckDBCatalogContent : public catalog::CatalogContent {
public:
DuckDBCatalogContent() : catalog::CatalogContent{nullptr /* vfs */} {}
DuckDBCatalogContent() : catalog::CatalogContent{} {}

virtual void init(const std::string& dbPath, const std::string& catalogName,
main::ClientContext* context);
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace catalog {

Catalog::Catalog(VirtualFileSystem* vfs) : isUpdated{false}, wal{nullptr} {
readOnlyVersion = std::make_unique<CatalogContent>(vfs);
Catalog::Catalog() : isUpdated{false}, wal{nullptr} {
readOnlyVersion = std::make_unique<CatalogContent>();
}

Catalog::Catalog(WAL* wal, VirtualFileSystem* vfs) : isUpdated{false}, wal{wal} {
Expand Down Expand Up @@ -103,11 +103,11 @@ std::vector<TableCatalogEntry*> Catalog::getTableSchemas(Transaction* tx,
return result;
}

void Catalog::prepareCommitOrRollback(TransactionAction action) {
void Catalog::prepareCommitOrRollback(TransactionAction action, VirtualFileSystem* fs) {
if (hasUpdates()) {
wal->logCatalogRecord();
if (action == TransactionAction::COMMIT) {
readWriteVersion->saveToFile(wal->getDirectory(), FileVersionType::WAL_VERSION);
readWriteVersion->saveToFile(wal->getDirectory(), FileVersionType::WAL_VERSION, fs);
}
}
}
Expand Down
23 changes: 12 additions & 11 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ using namespace kuzu::storage;
namespace kuzu {
namespace catalog {

CatalogContent::CatalogContent(VirtualFileSystem* vfs) : nextTableID{0}, vfs{vfs} {
CatalogContent::CatalogContent() : nextTableID{0} {
tables = std::make_unique<CatalogSet>();
functions = std::make_unique<CatalogSet>();
registerBuiltInFunctions();
}

CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem* vfs) : vfs{vfs} {
readFromFile(directory, FileVersionType::ORIGINAL);
CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem* fs) {
readFromFile(directory, FileVersionType::ORIGINAL, fs);
registerBuiltInFunctions();
}

Expand Down Expand Up @@ -239,21 +239,23 @@ static void writeMagicBytes(Serializer& serializer) {
}
}

void CatalogContent::saveToFile(const std::string& directory, FileVersionType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(vfs, directory, dbFileType);
void CatalogContent::saveToFile(const std::string& directory, FileVersionType dbFileType,
VirtualFileSystem* fs) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, dbFileType);
Serializer serializer(
std::make_unique<BufferedFileWriter>(vfs->openFile(catalogPath, O_WRONLY | O_CREAT)));
std::make_unique<BufferedFileWriter>(fs->openFile(catalogPath, O_WRONLY | O_CREAT)));
writeMagicBytes(serializer);
serializer.serializeValue(StorageVersionInfo::getStorageVersion());
tables->serialize(serializer);
serializer.serializeValue(nextTableID);
functions->serialize(serializer);
}

void CatalogContent::readFromFile(const std::string& directory, FileVersionType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(vfs, directory, dbFileType);
void CatalogContent::readFromFile(const std::string& directory, FileVersionType dbFileType,
VirtualFileSystem* fs) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, dbFileType);
Deserializer deserializer(
std::make_unique<BufferedFileReader>(vfs->openFile(catalogPath, O_RDONLY)));
std::make_unique<BufferedFileReader>(fs->openFile(catalogPath, O_RDONLY)));
validateMagicBytes(deserializer);
storage_version_t savedStorageVersion;
deserializer.deserializeValue(savedStorageVersion);
Expand All @@ -278,8 +280,7 @@ function::ScalarMacroFunction* CatalogContent::getScalarMacroFunction(
}

std::unique_ptr<CatalogContent> CatalogContent::copy() const {
std::unordered_map<std::string, std::unique_ptr<function::ScalarMacroFunction>> macrosToCopy;
return std::make_unique<CatalogContent>(tables->copy(), nextTableID, functions->copy(), vfs);
return std::make_unique<CatalogContent>(tables->copy(), nextTableID, functions->copy());
}

void CatalogContent::registerBuiltInFunctions() {
Expand Down
11 changes: 6 additions & 5 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RDFGraphCatalogEntry;

class Catalog {
public:
explicit Catalog(common::VirtualFileSystem* vfs);
Catalog();

Catalog(storage::WAL* wal, common::VirtualFileSystem* vfs);

Expand Down Expand Up @@ -73,7 +73,8 @@ class Catalog {
std::vector<std::string> getMacroNames(transaction::Transaction* tx) const;

// ----------------------------- Tx ----------------------------
void prepareCommitOrRollback(transaction::TransactionAction action);
void prepareCommitOrRollback(transaction::TransactionAction action,
common::VirtualFileSystem* fs);
void checkpointInMemory();

void initCatalogContentForWriteTrxIfNecessary() {
Expand All @@ -83,9 +84,9 @@ class Catalog {
}

static void saveInitialCatalogToFile(const std::string& directory,
common::VirtualFileSystem* vfs) {
std::make_unique<Catalog>(vfs)->getReadOnlyVersion()->saveToFile(directory,
common::FileVersionType::ORIGINAL);
common::VirtualFileSystem* fs) {
auto catalog = Catalog();
catalog.getReadOnlyVersion()->saveToFile(directory, common::FileVersionType::ORIGINAL, fs);
}

private:
Expand Down
14 changes: 7 additions & 7 deletions src/include/catalog/catalog_content.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@ class CatalogContent {
friend class Catalog;

public:
KUZU_API explicit CatalogContent(common::VirtualFileSystem* vfs);
KUZU_API CatalogContent();

virtual ~CatalogContent() = default;

CatalogContent(const std::string& directory, common::VirtualFileSystem* vfs);

CatalogContent(std::unique_ptr<CatalogSet> tables, common::table_id_t nextTableID,
std::unique_ptr<CatalogSet> functions, common::VirtualFileSystem* vfs)
: tables{std::move(tables)}, nextTableID{nextTableID}, vfs{vfs},
functions{std::move(functions)} {}
std::unique_ptr<CatalogSet> functions)
: tables{std::move(tables)}, nextTableID{nextTableID}, functions{std::move(functions)} {}

common::table_id_t getTableID(const std::string& tableName) const;
CatalogEntry* getTableCatalogEntry(common::table_id_t tableID) const;

void saveToFile(const std::string& directory, common::FileVersionType dbFileType);
void readFromFile(const std::string& directory, common::FileVersionType dbFileType);
void saveToFile(const std::string& directory, common::FileVersionType dbFileType,
common::VirtualFileSystem* fs);
void readFromFile(const std::string& directory, common::FileVersionType dbFileType,
common::VirtualFileSystem* fs);

std::unique_ptr<CatalogContent> copy() const;

Expand Down Expand Up @@ -97,7 +98,6 @@ class CatalogContent {

private:
common::table_id_t nextTableID;
common::VirtualFileSystem* vfs;
std::unique_ptr<CatalogSet> functions;
};

Expand Down
14 changes: 6 additions & 8 deletions src/include/storage/stats/node_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {
propertyStatistics);
NodeTableStatsAndDeletedIDs(const NodeTableStatsAndDeletedIDs& other);

inline common::offset_t getMaxNodeOffset() {
return getMaxNodeOffsetFromNumTuples(getNumTuples());
}
common::offset_t getMaxNodeOffset() { return getMaxNodeOffsetFromNumTuples(getNumTuples()); }

common::offset_t addNode();

Expand All @@ -43,21 +41,21 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {

std::vector<common::offset_t> getDeletedNodeOffsets() const;

static inline uint64_t getNumTuplesFromMaxNodeOffset(common::offset_t maxNodeOffset) {
static uint64_t getNumTuplesFromMaxNodeOffset(common::offset_t maxNodeOffset) {
return (maxNodeOffset == UINT64_MAX) ? 0ull : maxNodeOffset + 1ull;
}
static inline uint64_t getMaxNodeOffsetFromNumTuples(uint64_t numTuples) {
static uint64_t getMaxNodeOffsetFromNumTuples(uint64_t numTuples) {
return numTuples == 0 ? UINT64_MAX : numTuples - 1;
}

inline void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
metadataDAHInfos.push_back(std::move(metadataDAHInfo));
}
inline void removeMetadataDAHInfoForColumn(common::column_id_t columnID) {
void removeMetadataDAHInfoForColumn(common::column_id_t columnID) {
KU_ASSERT(columnID < metadataDAHInfos.size());
metadataDAHInfos.erase(metadataDAHInfos.begin() + columnID);
}
inline MetadataDAHInfo* getMetadataDAHInfo(common::column_id_t columnID) {
MetadataDAHInfo* getMetadataDAHInfo(common::column_id_t columnID) {
KU_ASSERT(columnID < metadataDAHInfos.size());
return metadataDAHInfos[columnID].get();
}
Expand Down
48 changes: 19 additions & 29 deletions src/include/storage/stats/nodes_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,37 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
public:
// Should only be used by saveInitialNodesStatisticsAndDeletedIDsToFile to start a database
// from an empty directory.
explicit NodesStoreStatsAndDeletedIDs(common::VirtualFileSystem* vfs)
: TablesStatistics{nullptr /* metadataFH */, nullptr /* bufferManager */, nullptr /* wal */,
vfs} {};
NodesStoreStatsAndDeletedIDs()
: TablesStatistics{nullptr /* metadataFH */, nullptr /* bufferManager */,
nullptr /* wal */} {};
// Should be used when an already loaded database is started from a directory.
NodesStoreStatsAndDeletedIDs(BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
common::VirtualFileSystem* vfs,
common::VirtualFileSystem* fs,
common::FileVersionType dbFileType = common::FileVersionType::ORIGINAL)
: TablesStatistics{metadataFH, bufferManager, wal, vfs} {
readFromFile(dbFileType);
: TablesStatistics{metadataFH, bufferManager, wal} {
readFromFile(dbFileType, fs);
}

inline NodeTableStatsAndDeletedIDs* getNodeStatisticsAndDeletedIDs(
NodeTableStatsAndDeletedIDs* getNodeStatisticsAndDeletedIDs(
transaction::Transaction* transaction, common::table_id_t tableID) const {
return getNodeTableStats(transaction->getType(), tableID);
}

static inline void saveInitialNodesStatisticsAndDeletedIDsToFile(common::VirtualFileSystem* vfs,
static void saveInitialNodesStatisticsAndDeletedIDsToFile(common::VirtualFileSystem* fs,
const std::string& directory) {
std::make_unique<NodesStoreStatsAndDeletedIDs>(vfs)->saveToFile(directory,
common::FileVersionType::ORIGINAL, transaction::TransactionType::READ_ONLY);
auto stats = NodesStoreStatsAndDeletedIDs();
stats.saveToFile(directory, common::FileVersionType::ORIGINAL,
transaction::TransactionType::READ_ONLY, fs);
}

void updateNumTuplesByValue(common::table_id_t tableID, int64_t value) override;

common::offset_t getMaxNodeOffset(transaction::Transaction* transaction,
common::table_id_t tableID);

// This function is only used for testing purpose.
inline uint32_t getNumNodeStatisticsAndDeleteIDsPerTable() const {
return readOnlyVersion->tableStatisticPerTable.size();
}

// This function assumes that there is a single write transaction. That is why for now we
// keep the interface simple and no transaction is passed.
inline common::offset_t addNode(common::table_id_t tableID) {
common::offset_t addNode(common::table_id_t tableID) {
lock_t lck{mtx};
initTableStatisticsForWriteTrxNoLock();
KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID));
Expand All @@ -57,7 +53,7 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
}

// Refer to the comments for addNode.
inline void deleteNode(common::table_id_t tableID, common::offset_t nodeOffset) {
void deleteNode(common::table_id_t tableID, common::offset_t nodeOffset) {
lock_t lck{mtx};
initTableStatisticsForWriteTrxNoLock();
KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID));
Expand All @@ -76,26 +72,20 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
common::table_id_t tableID, common::column_id_t columnID);

protected:
inline std::unique_ptr<TableStatistics> constructTableStatistic(
std::unique_ptr<TableStatistics> constructTableStatistic(
catalog::TableCatalogEntry* tableEntry) override {
return std::make_unique<NodeTableStatsAndDeletedIDs>(metadataFH, *tableEntry, bufferManager,
wal);
}

inline std::unique_ptr<TableStatistics> constructTableStatistic(
TableStatistics* tableStatistics) override {
return std::make_unique<NodeTableStatsAndDeletedIDs>(
*(NodeTableStatsAndDeletedIDs*)tableStatistics);
}

inline std::string getTableStatisticsFilePath(const std::string& directory,
common::FileVersionType dbFileType) override {
return StorageUtils::getNodesStatisticsAndDeletedIDsFilePath(vfs, directory, dbFileType);
std::string getTableStatisticsFilePath(const std::string& directory,
common::FileVersionType dbFileType, common::VirtualFileSystem* fs) override {
return StorageUtils::getNodesStatisticsAndDeletedIDsFilePath(fs, directory, dbFileType);
}

private:
inline NodeTableStatsAndDeletedIDs* getNodeTableStats(
transaction::TransactionType transactionType, common::table_id_t tableID) const {
NodeTableStatsAndDeletedIDs* getNodeTableStats(transaction::TransactionType transactionType,
common::table_id_t tableID) const {
return transactionType == transaction::TransactionType::READ_ONLY ?
dynamic_cast<NodeTableStatsAndDeletedIDs*>(
readOnlyVersion->tableStatisticPerTable.at(tableID).get()) :
Expand Down
20 changes: 9 additions & 11 deletions src/include/storage/stats/rel_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,29 @@ class RelTableStats : public TableStatistics {

RelTableStats(const RelTableStats& other);

inline common::offset_t getNextRelOffset() const { return nextRelOffset; }
inline void incrementNextRelOffset(uint64_t numTuples) { nextRelOffset += numTuples; }
common::offset_t getNextRelOffset() const { return nextRelOffset; }
void incrementNextRelOffset(uint64_t numTuples) { nextRelOffset += numTuples; }

inline void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo,
void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
metadataDAHInfos.push_back(std::move(metadataDAHInfo));
}
inline void removeMetadataDAHInfoForColumn(common::column_id_t columnID,
void removeMetadataDAHInfoForColumn(common::column_id_t columnID,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
KU_ASSERT(columnID < metadataDAHInfos.size());
metadataDAHInfos.erase(metadataDAHInfos.begin() + columnID);
}
inline MetadataDAHInfo* getCSROffsetMetadataDAHInfo(common::RelDataDirection direction) {
MetadataDAHInfo* getCSROffsetMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSROffsetMetadataDAHInfo.get() :
bwdCSROffsetMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSRLengthMetadataDAHInfo.get() :
bwdCSRLengthMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getColumnMetadataDAHInfo(common::column_id_t columnID,
MetadataDAHInfo* getColumnMetadataDAHInfo(common::column_id_t columnID,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
KU_ASSERT(columnID < metadataDAHInfos.size());
Expand All @@ -58,12 +58,10 @@ class RelTableStats : public TableStatistics {
static std::unique_ptr<RelTableStats> deserialize(uint64_t numRels, common::table_id_t tableID,
common::Deserializer& deserializer);

inline std::unique_ptr<TableStatistics> copy() final {
return std::make_unique<RelTableStats>(*this);
}
std::unique_ptr<TableStatistics> copy() final { return std::make_unique<RelTableStats>(*this); }

private:
inline std::vector<std::unique_ptr<MetadataDAHInfo>>& getDirectedMetadataDAHInfosRef(
std::vector<std::unique_ptr<MetadataDAHInfo>>& getDirectedMetadataDAHInfosRef(
common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdMetadataDAHInfos :
bwdMetadataDAHInfos;
Expand Down
Loading

0 comments on commit 65a080f

Please sign in to comment.