Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove file system from catalog and statistics #3258

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extension/duckdb_scanner/src/include/duckdb_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct BoundExtraCreateDuckDBTableInfo : public binder::BoundExtraCreateTableInf

class DuckDBCatalogContent : public catalog::CatalogContent {
public:
DuckDBCatalogContent() : catalog::CatalogContent{nullptr /* vfs */} {}
DuckDBCatalogContent() : catalog::CatalogContent{} {}

virtual void init(const std::string& dbPath, const std::string& catalogName,
main::ClientContext* context);
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace catalog {

Catalog::Catalog(VirtualFileSystem* vfs) : isUpdated{false}, wal{nullptr} {
readOnlyVersion = std::make_unique<CatalogContent>(vfs);
Catalog::Catalog() : isUpdated{false}, wal{nullptr} {
readOnlyVersion = std::make_unique<CatalogContent>();
}

Catalog::Catalog(WAL* wal, VirtualFileSystem* vfs) : isUpdated{false}, wal{wal} {
Expand Down Expand Up @@ -103,11 +103,11 @@ std::vector<TableCatalogEntry*> Catalog::getTableSchemas(Transaction* tx,
return result;
}

void Catalog::prepareCommitOrRollback(TransactionAction action) {
void Catalog::prepareCommitOrRollback(TransactionAction action, VirtualFileSystem* fs) {
if (hasUpdates()) {
wal->logCatalogRecord();
if (action == TransactionAction::COMMIT) {
readWriteVersion->saveToFile(wal->getDirectory(), FileVersionType::WAL_VERSION);
readWriteVersion->saveToFile(wal->getDirectory(), FileVersionType::WAL_VERSION, fs);
}
}
}
Expand Down
23 changes: 12 additions & 11 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ using namespace kuzu::storage;
namespace kuzu {
namespace catalog {

CatalogContent::CatalogContent(VirtualFileSystem* vfs) : nextTableID{0}, vfs{vfs} {
CatalogContent::CatalogContent() : nextTableID{0} {
tables = std::make_unique<CatalogSet>();
functions = std::make_unique<CatalogSet>();
registerBuiltInFunctions();
}

CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem* vfs) : vfs{vfs} {
readFromFile(directory, FileVersionType::ORIGINAL);
CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem* fs) {
readFromFile(directory, FileVersionType::ORIGINAL, fs);
registerBuiltInFunctions();
}

Expand Down Expand Up @@ -239,21 +239,23 @@ static void writeMagicBytes(Serializer& serializer) {
}
}

void CatalogContent::saveToFile(const std::string& directory, FileVersionType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(vfs, directory, dbFileType);
void CatalogContent::saveToFile(const std::string& directory, FileVersionType dbFileType,
VirtualFileSystem* fs) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, dbFileType);
Serializer serializer(
std::make_unique<BufferedFileWriter>(vfs->openFile(catalogPath, O_WRONLY | O_CREAT)));
std::make_unique<BufferedFileWriter>(fs->openFile(catalogPath, O_WRONLY | O_CREAT)));
writeMagicBytes(serializer);
serializer.serializeValue(StorageVersionInfo::getStorageVersion());
tables->serialize(serializer);
serializer.serializeValue(nextTableID);
functions->serialize(serializer);
}

void CatalogContent::readFromFile(const std::string& directory, FileVersionType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(vfs, directory, dbFileType);
void CatalogContent::readFromFile(const std::string& directory, FileVersionType dbFileType,
VirtualFileSystem* fs) {
auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, dbFileType);
Deserializer deserializer(
std::make_unique<BufferedFileReader>(vfs->openFile(catalogPath, O_RDONLY)));
std::make_unique<BufferedFileReader>(fs->openFile(catalogPath, O_RDONLY)));
validateMagicBytes(deserializer);
storage_version_t savedStorageVersion;
deserializer.deserializeValue(savedStorageVersion);
Expand All @@ -278,8 +280,7 @@ function::ScalarMacroFunction* CatalogContent::getScalarMacroFunction(
}

std::unique_ptr<CatalogContent> CatalogContent::copy() const {
std::unordered_map<std::string, std::unique_ptr<function::ScalarMacroFunction>> macrosToCopy;
return std::make_unique<CatalogContent>(tables->copy(), nextTableID, functions->copy(), vfs);
return std::make_unique<CatalogContent>(tables->copy(), nextTableID, functions->copy());
}

void CatalogContent::registerBuiltInFunctions() {
Expand Down
11 changes: 6 additions & 5 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RDFGraphCatalogEntry;

class Catalog {
public:
explicit Catalog(common::VirtualFileSystem* vfs);
Catalog();

Catalog(storage::WAL* wal, common::VirtualFileSystem* vfs);

Expand Down Expand Up @@ -73,7 +73,8 @@ class Catalog {
std::vector<std::string> getMacroNames(transaction::Transaction* tx) const;

// ----------------------------- Tx ----------------------------
void prepareCommitOrRollback(transaction::TransactionAction action);
void prepareCommitOrRollback(transaction::TransactionAction action,
common::VirtualFileSystem* fs);
void checkpointInMemory();

void initCatalogContentForWriteTrxIfNecessary() {
Expand All @@ -83,9 +84,9 @@ class Catalog {
}

static void saveInitialCatalogToFile(const std::string& directory,
common::VirtualFileSystem* vfs) {
std::make_unique<Catalog>(vfs)->getReadOnlyVersion()->saveToFile(directory,
common::FileVersionType::ORIGINAL);
common::VirtualFileSystem* fs) {
auto catalog = Catalog();
catalog.getReadOnlyVersion()->saveToFile(directory, common::FileVersionType::ORIGINAL, fs);
}

private:
Expand Down
14 changes: 7 additions & 7 deletions src/include/catalog/catalog_content.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@ class CatalogContent {
friend class Catalog;

public:
KUZU_API explicit CatalogContent(common::VirtualFileSystem* vfs);
KUZU_API CatalogContent();

virtual ~CatalogContent() = default;

CatalogContent(const std::string& directory, common::VirtualFileSystem* vfs);

CatalogContent(std::unique_ptr<CatalogSet> tables, common::table_id_t nextTableID,
std::unique_ptr<CatalogSet> functions, common::VirtualFileSystem* vfs)
: tables{std::move(tables)}, nextTableID{nextTableID}, vfs{vfs},
functions{std::move(functions)} {}
std::unique_ptr<CatalogSet> functions)
: tables{std::move(tables)}, nextTableID{nextTableID}, functions{std::move(functions)} {}

common::table_id_t getTableID(const std::string& tableName) const;
CatalogEntry* getTableCatalogEntry(common::table_id_t tableID) const;

void saveToFile(const std::string& directory, common::FileVersionType dbFileType);
void readFromFile(const std::string& directory, common::FileVersionType dbFileType);
void saveToFile(const std::string& directory, common::FileVersionType dbFileType,
common::VirtualFileSystem* fs);
void readFromFile(const std::string& directory, common::FileVersionType dbFileType,
common::VirtualFileSystem* fs);

std::unique_ptr<CatalogContent> copy() const;

Expand Down Expand Up @@ -97,7 +98,6 @@ class CatalogContent {

private:
common::table_id_t nextTableID;
common::VirtualFileSystem* vfs;
std::unique_ptr<CatalogSet> functions;
};

Expand Down
14 changes: 6 additions & 8 deletions src/include/storage/stats/node_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {
propertyStatistics);
NodeTableStatsAndDeletedIDs(const NodeTableStatsAndDeletedIDs& other);

inline common::offset_t getMaxNodeOffset() {
return getMaxNodeOffsetFromNumTuples(getNumTuples());
}
common::offset_t getMaxNodeOffset() { return getMaxNodeOffsetFromNumTuples(getNumTuples()); }

common::offset_t addNode();

Expand All @@ -43,21 +41,21 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics {

std::vector<common::offset_t> getDeletedNodeOffsets() const;

static inline uint64_t getNumTuplesFromMaxNodeOffset(common::offset_t maxNodeOffset) {
static uint64_t getNumTuplesFromMaxNodeOffset(common::offset_t maxNodeOffset) {
return (maxNodeOffset == UINT64_MAX) ? 0ull : maxNodeOffset + 1ull;
}
static inline uint64_t getMaxNodeOffsetFromNumTuples(uint64_t numTuples) {
static uint64_t getMaxNodeOffsetFromNumTuples(uint64_t numTuples) {
return numTuples == 0 ? UINT64_MAX : numTuples - 1;
}

inline void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo) {
metadataDAHInfos.push_back(std::move(metadataDAHInfo));
}
inline void removeMetadataDAHInfoForColumn(common::column_id_t columnID) {
void removeMetadataDAHInfoForColumn(common::column_id_t columnID) {
KU_ASSERT(columnID < metadataDAHInfos.size());
metadataDAHInfos.erase(metadataDAHInfos.begin() + columnID);
}
inline MetadataDAHInfo* getMetadataDAHInfo(common::column_id_t columnID) {
MetadataDAHInfo* getMetadataDAHInfo(common::column_id_t columnID) {
KU_ASSERT(columnID < metadataDAHInfos.size());
return metadataDAHInfos[columnID].get();
}
Expand Down
48 changes: 19 additions & 29 deletions src/include/storage/stats/nodes_store_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,37 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
public:
// Should only be used by saveInitialNodesStatisticsAndDeletedIDsToFile to start a database
// from an empty directory.
explicit NodesStoreStatsAndDeletedIDs(common::VirtualFileSystem* vfs)
: TablesStatistics{nullptr /* metadataFH */, nullptr /* bufferManager */, nullptr /* wal */,
vfs} {};
NodesStoreStatsAndDeletedIDs()
: TablesStatistics{nullptr /* metadataFH */, nullptr /* bufferManager */,
nullptr /* wal */} {};
// Should be used when an already loaded database is started from a directory.
NodesStoreStatsAndDeletedIDs(BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
common::VirtualFileSystem* vfs,
common::VirtualFileSystem* fs,
common::FileVersionType dbFileType = common::FileVersionType::ORIGINAL)
: TablesStatistics{metadataFH, bufferManager, wal, vfs} {
readFromFile(dbFileType);
: TablesStatistics{metadataFH, bufferManager, wal} {
readFromFile(dbFileType, fs);
}

inline NodeTableStatsAndDeletedIDs* getNodeStatisticsAndDeletedIDs(
NodeTableStatsAndDeletedIDs* getNodeStatisticsAndDeletedIDs(
transaction::Transaction* transaction, common::table_id_t tableID) const {
return getNodeTableStats(transaction->getType(), tableID);
}

static inline void saveInitialNodesStatisticsAndDeletedIDsToFile(common::VirtualFileSystem* vfs,
static void saveInitialNodesStatisticsAndDeletedIDsToFile(common::VirtualFileSystem* fs,
const std::string& directory) {
std::make_unique<NodesStoreStatsAndDeletedIDs>(vfs)->saveToFile(directory,
common::FileVersionType::ORIGINAL, transaction::TransactionType::READ_ONLY);
auto stats = NodesStoreStatsAndDeletedIDs();
stats.saveToFile(directory, common::FileVersionType::ORIGINAL,
transaction::TransactionType::READ_ONLY, fs);
}

void updateNumTuplesByValue(common::table_id_t tableID, int64_t value) override;

common::offset_t getMaxNodeOffset(transaction::Transaction* transaction,
common::table_id_t tableID);

// This function is only used for testing purpose.
inline uint32_t getNumNodeStatisticsAndDeleteIDsPerTable() const {
return readOnlyVersion->tableStatisticPerTable.size();
}

// This function assumes that there is a single write transaction. That is why for now we
// keep the interface simple and no transaction is passed.
inline common::offset_t addNode(common::table_id_t tableID) {
common::offset_t addNode(common::table_id_t tableID) {
lock_t lck{mtx};
initTableStatisticsForWriteTrxNoLock();
KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID));
Expand All @@ -57,7 +53,7 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
}

// Refer to the comments for addNode.
inline void deleteNode(common::table_id_t tableID, common::offset_t nodeOffset) {
void deleteNode(common::table_id_t tableID, common::offset_t nodeOffset) {
lock_t lck{mtx};
initTableStatisticsForWriteTrxNoLock();
KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID));
Expand All @@ -76,26 +72,20 @@ class NodesStoreStatsAndDeletedIDs : public TablesStatistics {
common::table_id_t tableID, common::column_id_t columnID);

protected:
inline std::unique_ptr<TableStatistics> constructTableStatistic(
std::unique_ptr<TableStatistics> constructTableStatistic(
catalog::TableCatalogEntry* tableEntry) override {
return std::make_unique<NodeTableStatsAndDeletedIDs>(metadataFH, *tableEntry, bufferManager,
wal);
}

inline std::unique_ptr<TableStatistics> constructTableStatistic(
TableStatistics* tableStatistics) override {
return std::make_unique<NodeTableStatsAndDeletedIDs>(
*(NodeTableStatsAndDeletedIDs*)tableStatistics);
}

inline std::string getTableStatisticsFilePath(const std::string& directory,
common::FileVersionType dbFileType) override {
return StorageUtils::getNodesStatisticsAndDeletedIDsFilePath(vfs, directory, dbFileType);
std::string getTableStatisticsFilePath(const std::string& directory,
common::FileVersionType dbFileType, common::VirtualFileSystem* fs) override {
return StorageUtils::getNodesStatisticsAndDeletedIDsFilePath(fs, directory, dbFileType);
}

private:
inline NodeTableStatsAndDeletedIDs* getNodeTableStats(
transaction::TransactionType transactionType, common::table_id_t tableID) const {
NodeTableStatsAndDeletedIDs* getNodeTableStats(transaction::TransactionType transactionType,
common::table_id_t tableID) const {
return transactionType == transaction::TransactionType::READ_ONLY ?
dynamic_cast<NodeTableStatsAndDeletedIDs*>(
readOnlyVersion->tableStatisticPerTable.at(tableID).get()) :
Expand Down
20 changes: 9 additions & 11 deletions src/include/storage/stats/rel_table_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,29 @@ class RelTableStats : public TableStatistics {

RelTableStats(const RelTableStats& other);

inline common::offset_t getNextRelOffset() const { return nextRelOffset; }
inline void incrementNextRelOffset(uint64_t numTuples) { nextRelOffset += numTuples; }
common::offset_t getNextRelOffset() const { return nextRelOffset; }
void incrementNextRelOffset(uint64_t numTuples) { nextRelOffset += numTuples; }

inline void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo,
void addMetadataDAHInfoForColumn(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
metadataDAHInfos.push_back(std::move(metadataDAHInfo));
}
inline void removeMetadataDAHInfoForColumn(common::column_id_t columnID,
void removeMetadataDAHInfoForColumn(common::column_id_t columnID,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
KU_ASSERT(columnID < metadataDAHInfos.size());
metadataDAHInfos.erase(metadataDAHInfos.begin() + columnID);
}
inline MetadataDAHInfo* getCSROffsetMetadataDAHInfo(common::RelDataDirection direction) {
MetadataDAHInfo* getCSROffsetMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSROffsetMetadataDAHInfo.get() :
bwdCSROffsetMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
MetadataDAHInfo* getCSRLengthMetadataDAHInfo(common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdCSRLengthMetadataDAHInfo.get() :
bwdCSRLengthMetadataDAHInfo.get();
}
inline MetadataDAHInfo* getColumnMetadataDAHInfo(common::column_id_t columnID,
MetadataDAHInfo* getColumnMetadataDAHInfo(common::column_id_t columnID,
common::RelDataDirection direction) {
auto& metadataDAHInfos = getDirectedMetadataDAHInfosRef(direction);
KU_ASSERT(columnID < metadataDAHInfos.size());
Expand All @@ -58,12 +58,10 @@ class RelTableStats : public TableStatistics {
static std::unique_ptr<RelTableStats> deserialize(uint64_t numRels, common::table_id_t tableID,
common::Deserializer& deserializer);

inline std::unique_ptr<TableStatistics> copy() final {
return std::make_unique<RelTableStats>(*this);
}
std::unique_ptr<TableStatistics> copy() final { return std::make_unique<RelTableStats>(*this); }

private:
inline std::vector<std::unique_ptr<MetadataDAHInfo>>& getDirectedMetadataDAHInfosRef(
std::vector<std::unique_ptr<MetadataDAHInfo>>& getDirectedMetadataDAHInfosRef(
common::RelDataDirection direction) {
return direction == common::RelDataDirection::FWD ? fwdMetadataDAHInfos :
bwdMetadataDAHInfos;
Expand Down
Loading
Loading